- FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
- LOG.debug("Looking up primary info for {} from {}", shardName, info);
- return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
- }).thenApply(response -> {
- if (response instanceof RequestFailure) {
- final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
- LOG.debug("Connect request failed {}", failure, failure.getCause());
- throw Throwables.propagate(failure.getCause());
- }
+ final ShardState raced = backends.putIfAbsent(cookie, toInsert);
+ if (raced != null) {
+ // We have had a concurrent insertion, return that
+ LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName);
+ return raced.getStage();
+ }