- /**
- * Finds the primary for a given shard
- *
- * @param shardName
- * @return
- */
- public ActorSelection findPrimary(String shardName) {
- String path = findPrimaryPath(shardName);
- return actorSystem.actorSelection(path);
+ public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+ Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+ if(ret != null){
+ return ret;
+ }
+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindPrimary(shardName, true), shardInitializationTimeout);
+
+ return future.transform(new Mapper<Object, ActorSelection>() {
+ @Override
+ public ActorSelection checkedApply(Object response) throws Exception {
+ if(response instanceof PrimaryFound) {
+ PrimaryFound found = (PrimaryFound)response;
+
+ LOG.debug("Primary found {}", found.getPrimaryPath());
+ ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
+ primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
+ return actorSelection;
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
+ } else if(response instanceof NoShardLeaderException) {
+ throw (NoShardLeaderException)response;
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindPrimary returned unkown response: %s", response));
+ }
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());