- return new ShardState(FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(i -> {
- LOG.debug("Looking up primary info for {} from {}", shardName, i);
- return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
- }).thenApply(response -> {
- if (response instanceof RequestFailure) {
- final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
- LOG.debug("Connect request failed {}", failure, failure.getCause());
- throw Throwables.propagate(failure.getCause());
+ final CompletableFuture<ShardBackendInfo> future = new CompletableFuture<>();
+ FutureConverters.toJava(actorUtils.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> {
+ if (failure == null) {
+ connectShard(shardName, cookie, info, future);
+ return;
+ }
+
+ LOG.debug("Shard {} failed to resolve", shardName, failure);
+ if (failure instanceof NoShardLeaderException) {
+ future.completeExceptionally(wrap("Shard has no current leader", failure));
+ } else if (failure instanceof NotInitializedException) {
+ // FIXME: this actually is an exception we can retry on
+ LOG.info("Shard {} has not initialized yet", shardName);
+ future.completeExceptionally(failure);
+ } else if (failure instanceof PrimaryNotFoundException) {
+ LOG.info("Failed to find primary for shard {}", shardName);
+ future.completeExceptionally(failure);
+ } else {
+ future.completeExceptionally(failure);