- return new ShardBackendInfo(success.getBackend(),
- nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
- success.getDataTree(), success.getMaxMessages());
- }));
+ private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
+ final CompletableFuture<ShardBackendInfo> future) {
+ LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
+
+ FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
+ .whenComplete((response, failure) -> {
+ if (failure != null) {
+ LOG.debug("Connect attempt to {} failed", shardName, failure);
+ future.completeExceptionally(failure);
+ return;
+ }
+ if (response instanceof RequestFailure) {
+ final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
+ LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+ future.completeExceptionally(cause);
+ return;
+ }
+
+ LOG.debug("Resolved backend information to {}", response);
+ Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}",
+ response);
+ final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
+ success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
+ success.getMaxMessages()));
+ });