+ public ListenableFuture<RpcResult<LocateShardOutput>> locateShard(final LocateShardInput input) {
+ final ActorUtils utils;
+ switch (input.getDataStoreType()) {
+ case Config:
+ utils = configDataStore.getActorUtils();
+ break;
+ case Operational:
+ utils = operDataStore.getActorUtils();
+ break;
+ default:
+ return newFailedRpcResultFuture("Unhandled datastore in " + input);
+ }
+
+ final SettableFuture<RpcResult<LocateShardOutput>> ret = SettableFuture.create();
+ utils.findPrimaryShardAsync(input.getShardName()).onComplete(new OnComplete<PrimaryShardInfo>() {
+ @Override
+ public void onComplete(final Throwable failure, final PrimaryShardInfo success) throws Throwable {
+ if (failure != null) {
+ LOG.debug("Failed to find shard for {}", input, failure);
+ ret.setException(failure);
+ return;
+ }
+
+ // Data tree implies local leak
+ if (success.getLocalShardDataTree().isPresent()) {
+ ret.set(LOCAL_SHARD_RESULT);
+ return;
+ }
+
+ final ActorSelection actorPath = success.getPrimaryShardActor();
+ ret.set(newSuccessfulResult(new LocateShardOutputBuilder()
+ .setMemberNode(new LeaderActorRefBuilder()
+ .setLeaderActorRef(actorPath.toSerializationFormat())
+ .build())
+ .build()));
+ }
+ }, utils.getClientDispatcher());
+
+ return ret;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<MakeLeaderLocalOutput>> makeLeaderLocal(final MakeLeaderLocalInput input) {