import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
} else if(message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
- ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
- addShard(msg.shardName, msg.primaryFound, getSender());
+ } else if(message instanceof PrimaryShardFoundForContext) {
+ PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
+ onPrimaryShardFoundContext(primaryShardFoundContext);
} else if(message instanceof RemoveShardReplica){
onRemoveShardReplica((RemoveShardReplica)message);
} else if(message instanceof GetSnapshot) {
} else if (message instanceof SaveSnapshotSuccess) {
LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
} else if (message instanceof SaveSnapshotFailure) {
- LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
- persistenceId(), ((SaveSnapshotFailure)message).cause());
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
+ persistenceId(), ((SaveSnapshotFailure) message).cause());
} else {
unknownMessage(message);
}
}
+ private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
+ if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
+ addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
+ }
+ }
+
private void onShardReplicaRemoved(ServerRemoved message) {
final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
return false;
}
- private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
+ private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
return;
}
- Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
- getShardInitializationTimeout().duration().$times(2));
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+
+ }
- final ActorRef sender = getSender();
- Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
- futureObj.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) {
- if (failure != null) {
- LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
- } else {
- if(response instanceof RemotePrimaryShardFound) {
- self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
- (RemotePrimaryShardFound)response), sender);
- } else if(response instanceof LocalPrimaryShardFound) {
- sendLocalReplicaAlreadyExistsReply(shardName, sender);
- } else {
- String msg = String.format("Failed to find leader for shard %s: received response: %s",
- shardName, response);
- LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
- new RuntimeException(msg)), getSelf());
- }
- }
+ public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
}
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+
+ });
}
private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
}
}
- private static class ForwardedAddServerPrimaryShardFound {
- String shardName;
- RemotePrimaryShardFound primaryFound;
-
- ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
- this.shardName = shardName;
- this.primaryFound = primaryFound;
- }
- }
-
private static class ForwardedAddServerReply {
ShardInformation shardInfo;
AddServerReply addServerReply;
return Props.create(ShardManager.class, this);
}
}
+
+ private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
+
+
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ handler.onFailure(failure);
+ } else {
+ if(response instanceof RemotePrimaryShardFound) {
+ handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+ } else {
+ handler.onUnknownResponse(response);
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ /**
+ * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
+ * a remote or local find primary message is processed
+ */
+ private static interface FindPrimaryResponseHandler {
+ /**
+ * Invoked when a Failure message is received as a response
+ *
+ * @param failure
+ */
+ void onFailure(Throwable failure);
+
+ /**
+ * Invoked when a RemotePrimaryShardFound response is received
+ *
+ * @param response
+ */
+ void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
+
+ /**
+ * Invoked when a LocalPrimaryShardFound response is received
+ * @param response
+ */
+ void onLocalPrimaryFound(LocalPrimaryShardFound response);
+
+ /**
+ * Invoked when an unknown response is received. This is another type of failure.
+ *
+ * @param response
+ */
+ void onUnknownResponse(Object response);
+ }
+
+ /**
+ * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
+ * replica and sends a wrapped Failure response to some targetActor
+ */
+ private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+ private final ActorRef targetActor;
+ private final String shardName;
+ private final String persistenceId;
+ private final ActorRef shardManagerActor;
+
+ /**
+ * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
+ * @param shardName The name of the shard for which the primary replica had to be found
+ * @param persistenceId The persistenceId for the ShardManager
+ * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
+ */
+ protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+ this.targetActor = Preconditions.checkNotNull(targetActor);
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+ }
+
+ public ActorRef getTargetActor() {
+ return targetActor;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public ActorRef getShardManagerActor() {
+ return shardManagerActor;
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+ targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
+ }
+
+ @Override
+ public void onUnknownResponse(Object response) {
+ String msg = String.format("Failed to find leader for shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), shardManagerActor);
+ }
+ }
+
+
+ /**
+ * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
+ * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
+ * as a successful response to find primary.
+ */
+ private static class PrimaryShardFoundForContext {
+ private final String shardName;
+ private final Object contextMessage;
+ private final RemotePrimaryShardFound remotePrimaryShardFound;
+ private final LocalPrimaryShardFound localPrimaryShardFound;
+
+ public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.contextMessage = Preconditions.checkNotNull(contextMessage);
+ Preconditions.checkNotNull(primaryFoundMessage);
+ this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
+ this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
+ }
+
+ @Nonnull
+ public String getPrimaryPath(){
+ if(remotePrimaryShardFound != null){
+ return remotePrimaryShardFound.getPrimaryPath();
+ }
+ return localPrimaryShardFound.getPrimaryPath();
+ }
+
+ @Nonnull
+ public Object getContextMessage() {
+ return contextMessage;
+ }
+
+ @Nullable
+ public RemotePrimaryShardFound getRemotePrimaryShardFound(){
+ return remotePrimaryShardFound;
+ }
+
+ @Nullable
+ public LocalPrimaryShardFound getLocalPrimaryShardFound(){
+ return localPrimaryShardFound;
+ }
+
+ boolean isPrimaryLocal(){
+ return (remotePrimaryShardFound == null);
+ }
+
+ @Nonnull
+ public String getShardName() {
+ return shardName;
+ }
+ }
}