+ public Optional<ActorSelection> findPrimaryShard(String shardName) {
+ String path = findPrimaryPathOrNull(shardName);
+ if (path == null){
+ return Optional.absent();
+ }
+ return Optional.of(actorSystem.actorSelection(path));
+ }
+
+ public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindPrimary(shardName, true).toSerializable(),
+ datastoreContext.getShardInitializationTimeout());
+
+ return future.transform(new Mapper<Object, ActorSelection>() {
+ @Override
+ public ActorSelection checkedApply(Object response) throws Exception {
+ if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+ PrimaryFound found = PrimaryFound.fromSerializable(response);
+
+ LOG.debug("Primary found {}", found.getPrimaryPath());
+ return actorSystem.actorSelection(found.getPrimaryPath());
+ } else if(response instanceof ActorNotInitialized) {
+ throw new NotInitializedException(
+ String.format("Found primary shard %s but it's not initialized yet. " +
+ "Please try again later", shardName));
+ } else if(response instanceof PrimaryNotFound) {
+ throw new PrimaryNotFoundException(
+ String.format("No primary shard found for %S.", shardName));
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindPrimary returned unkown response: %s", response));
+ }
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+ }
+
+ /**
+ * Finds a local shard given its shard name and return it's ActorRef
+ *
+ * @param shardName the name of the local shard that needs to be found
+ * @return a reference to a local shard actor which represents the shard
+ * specified by the shardName
+ */
+ public Optional<ActorRef> findLocalShard(String shardName) {
+ Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
+
+ if (result instanceof LocalShardFound) {
+ LocalShardFound found = (LocalShardFound) result;
+ LOG.debug("Local shard found {}", found.getPath());
+ return Optional.of(found.getPath());
+ }
+
+ return Optional.absent();
+ }
+
+ /**
+ * Finds a local shard async given its shard name and return a Future from which to obtain the
+ * ActorRef.
+ *
+ * @param shardName the name of the local shard that needs to be found
+ */
+ public Future<ActorRef> findLocalShardAsync( final String shardName) {
+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
+
+ return future.map(new Mapper<Object, ActorRef>() {
+ @Override
+ public ActorRef checkedApply(Object response) throws Throwable {
+ if(response instanceof LocalShardFound) {
+ LocalShardFound found = (LocalShardFound)response;
+ LOG.debug("Local shard found {}", found.getPath());
+ return found.getPath();
+ } else if(response instanceof ActorNotInitialized) {
+ throw new NotInitializedException(
+ String.format("Found local shard for %s but it's not initialized yet.",
+ shardName));
+ } else if(response instanceof LocalShardNotFound) {
+ throw new LocalShardNotFoundException(
+ String.format("Local shard for %s does not exist.", shardName));
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindLocalShard returned unkown response: %s", response));
+ }
+ }, getActorSystem().dispatcher());
+ }
+
+ private String findPrimaryPathOrNull(String shardName) {
+ Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
+
+ if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+ PrimaryFound found = PrimaryFound.fromSerializable(result);