- return new ShardBackendInfo(success.getBackend(),
- nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
- success.getDataTree(), success.getMaxMessages());
- }));
+ private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
+ final CompletableFuture<ShardBackendInfo> future) {
+ LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
+
+ FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
+ .whenComplete((response, failure) -> {
+ if (failure != null) {
+ LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
+ future.completeExceptionally(wrap("Connection attempt failed", failure));
+ return;
+ }
+ if (response instanceof RequestFailure) {
+ final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
+ LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+ final Throwable result = cause instanceof NotLeaderException
+ ? wrap("Leader moved during establishment", cause) : cause;
+ future.completeExceptionally(result);
+ return;
+ }
+
+ LOG.debug("Resolved backend information to {}", response);
+ Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s",
+ response);
+ final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
+ success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
+ success.getMaxMessages()));
+ });