- Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
- final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
+ .whenComplete((response, failure) -> onConnectResponse(shardName, cookie, future, response, failure));
+ }
+
+ private void onConnectResponse(final String shardName, final long cookie,
+ final CompletableFuture<ShardBackendInfo> future, final Object response, final Throwable failure) {
+ if (failure != null) {
+ LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
+ future.completeExceptionally(wrap("Connection attempt failed", failure));
+ return;
+ }
+ if (response instanceof RequestFailure) {
+ final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
+ LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+ final Throwable result = cause instanceof NotLeaderException
+ ? wrap("Leader moved during establishment", cause) : cause;
+ future.completeExceptionally(result);
+ return;
+ }