+
+ public static class Builder extends AbstractBuilder<Builder> {
+ }
+
+ private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
+
+
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ handler.onFailure(failure);
+ } else {
+ if(response instanceof RemotePrimaryShardFound) {
+ handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+ } else {
+ handler.onUnknownResponse(response);
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ /**
+ * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
+ * a remote or local find primary message is processed
+ */
+ private static interface FindPrimaryResponseHandler {
+ /**
+ * Invoked when a Failure message is received as a response
+ *
+ * @param failure
+ */
+ void onFailure(Throwable failure);
+
+ /**
+ * Invoked when a RemotePrimaryShardFound response is received
+ *
+ * @param response
+ */
+ void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
+
+ /**
+ * Invoked when a LocalPrimaryShardFound response is received
+ * @param response
+ */
+ void onLocalPrimaryFound(LocalPrimaryShardFound response);
+
+ /**
+ * Invoked when an unknown response is received. This is another type of failure.
+ *
+ * @param response
+ */
+ void onUnknownResponse(Object response);
+ }
+
+ /**
+ * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
+ * replica and sends a wrapped Failure response to some targetActor
+ */
+ private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+ private final ActorRef targetActor;
+ private final String shardName;
+ private final String persistenceId;
+ private final ActorRef shardManagerActor;
+
+ /**
+ * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
+ * @param shardName The name of the shard for which the primary replica had to be found
+ * @param persistenceId The persistenceId for the ShardManager
+ * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
+ */
+ protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+ this.targetActor = Preconditions.checkNotNull(targetActor);
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+ }
+
+ public ActorRef getTargetActor() {
+ return targetActor;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+ targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
+ }
+
+ @Override
+ public void onUnknownResponse(Object response) {
+ String msg = String.format("Failed to find leader for shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), shardManagerActor);
+ }
+ }
+
+
+ /**
+ * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
+ * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
+ * as a successful response to find primary.
+ */
+ private static class PrimaryShardFoundForContext {
+ private final String shardName;
+ private final Object contextMessage;
+ private final RemotePrimaryShardFound remotePrimaryShardFound;
+ private final LocalPrimaryShardFound localPrimaryShardFound;
+
+ public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
+ @Nonnull Object primaryFoundMessage) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.contextMessage = Preconditions.checkNotNull(contextMessage);
+ Preconditions.checkNotNull(primaryFoundMessage);
+ this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
+ (RemotePrimaryShardFound) primaryFoundMessage : null;
+ this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
+ (LocalPrimaryShardFound) primaryFoundMessage : null;
+ }
+
+ @Nonnull
+ String getPrimaryPath(){
+ if(remotePrimaryShardFound != null) {
+ return remotePrimaryShardFound.getPrimaryPath();
+ }
+ return localPrimaryShardFound.getPrimaryPath();
+ }
+
+ @Nonnull
+ Object getContextMessage() {
+ return contextMessage;
+ }
+
+ @Nullable
+ RemotePrimaryShardFound getRemotePrimaryShardFound() {
+ return remotePrimaryShardFound;
+ }
+
+ @Nonnull
+ String getShardName() {
+ return shardName;
+ }
+ }
+
+ /**
+ * The WrappedShardResponse class wraps a response from a Shard.
+ */
+ private static class WrappedShardResponse {
+ private final String shardName;
+ private final Object response;
+
+ private WrappedShardResponse(String shardName, Object response) {
+ this.shardName = shardName;
+ this.response = response;
+ }
+
+ String getShardName() {
+ return shardName;
+ }
+
+ Object getResponse() {
+ return response;
+ }
+ }