- return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName))
- .thenApply(o -> createBackendInfo(o, shardName, cookie));
+ final ShardState toInsert = resolveBackendInfo(shardName, cookie);
+
+ final ShardState raced = backends.putIfAbsent(cookie, toInsert);
+ if (raced != null) {
+ // We have had a concurrent insertion, return that
+ LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName);
+ return raced.getStage();
+ }
+
+ // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to
+ // complete
+ final CompletionStage<ShardBackendInfo> stage = toInsert.getStage();
+ stage.whenComplete((info, failure) -> {
+ if (failure != null) {
+ LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure);
+ backends.remove(cookie, toInsert);
+
+ // Remove cache state in case someone else forgot to invalidate it
+ flushCache(shardName);
+ }
+ });
+
+ return stage;