+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindPrimary(shardName, true), shardInitializationTimeout);
+
+ return future.transform(new Mapper<Object, PrimaryShardInfo>() {
+ @Override
+ public PrimaryShardInfo checkedApply(Object response) throws Exception {
+ if(response instanceof RemotePrimaryShardFound) {
+ LOG.debug("findPrimaryShardAsync received: {}", response);
+ RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ LOG.debug("findPrimaryShardAsync received: {}", response);
+ LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
+ found.getLocalShardDataTree());
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
+ } else if(response instanceof NoShardLeaderException) {
+ throw (NoShardLeaderException)response;
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindPrimary returned unkown response: %s", response));
+ }
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
+ }
+
+ private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
+ short primaryVersion, DataTree localShardDataTree) {
+ ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
+ PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
+ Optional.fromNullable(localShardDataTree));
+ primaryShardInfoCache.putSuccessful(shardName, info);
+ return info;