- FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
- LOG.debug("Looking up primary info for {} from {}", shardName, info);
- return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
- }).thenApply(response -> {
- if (response instanceof RequestFailure) {
- final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
- LOG.debug("Connect request failed {}", failure, failure.getCause());
- throw Throwables.propagate(failure.getCause());
+ // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to
+ // complete
+ final CompletionStage<ShardBackendInfo> stage = toInsert.getStage();
+ stage.whenComplete((info, failure) -> {
+ if (failure != null) {
+ LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure);
+ backends.remove(cookie, toInsert);
+
+ // Remove cache state in case someone else forgot to invalidate it
+ flushCache(shardName);
+ }
+ });
+
+ return stage;
+ }
+
+ @Override
+ public CompletionStage<ShardBackendInfo> refreshBackendInfo(final Long cookie,
+ final ShardBackendInfo staleInfo) {
+ final ShardState existing = backends.get(cookie);
+ if (existing != null) {
+ if (!staleInfo.equals(existing.getResult())) {
+ return existing.getStage();