- .whenComplete((response, failure) -> {
- if (failure != null) {
- LOG.debug("Connect attempt to {} failed", shardName, failure);
- future.completeExceptionally(failure);
- return;
- }
- if (response instanceof RequestFailure) {
- final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
- LOG.debug("Connect attempt to {} failed to process", shardName, cause);
- future.completeExceptionally(cause);
- return;
- }
-
- LOG.debug("Resolved backend information to {}", response);
- Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}",
- response);
- final ConnectClientSuccess success = (ConnectClientSuccess) response;
- future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
- success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
- success.getMaxMessages()));
- });
+ .whenComplete((response, failure) -> onConnectResponse(shardName, cookie, future, response, failure));
+ }
+
+ private void onConnectResponse(final String shardName, final long cookie,
+ final CompletableFuture<ShardBackendInfo> future, final Object response, final Throwable failure) {
+ if (failure != null) {
+ LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
+ future.completeExceptionally(wrap("Connection attempt failed", failure));
+ return;
+ }
+ if (response instanceof RequestFailure) {
+ final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
+ LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+ final Throwable result = cause instanceof NotLeaderException
+ ? wrap("Leader moved during establishment", cause) : cause;
+ future.completeExceptionally(result);
+ return;
+ }
+
+ LOG.debug("Resolved backend information to {}", response);
+ checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s", response);
+ final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
+ success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
+ success.getMaxMessages()));