import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private DatastoreSnapshot restoreFromSnapshot;
+ private ShardManagerSnapshot currentSnapshot;
+
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
private final String persistenceId;
/**
*/
- protected ShardManager(Builder builder) {
+ protected ShardManager(AbstractBuilder<?> builder) {
this.cluster = builder.cluster;
this.configuration = builder.configuration;
} else if(message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
- ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
- addShard(msg.shardName, msg.primaryFound, getSender());
+ } else if(message instanceof PrimaryShardFoundForContext) {
+ PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
+ onPrimaryShardFoundContext(primaryShardFoundContext);
} else if(message instanceof RemoveShardReplica){
onRemoveShardReplica((RemoveShardReplica)message);
} else if(message instanceof GetSnapshot) {
onGetSnapshot();
+ } else if(message instanceof ServerRemoved){
+ onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+ onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if (message instanceof SaveSnapshotFailure) {
- LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
- persistenceId(), ((SaveSnapshotFailure)message).cause());
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
+ persistenceId(), ((SaveSnapshotFailure) message).cause());
} else {
unknownMessage(message);
}
}
+ private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
+ if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
+ addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
+ }
+ }
+
+ private void onShardReplicaRemoved(ServerRemoved message) {
+ final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+ final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+ if(shardInformation == null) {
+ LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+ return;
+ } else if(shardInformation.getActor() != null) {
+ LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+ shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+ }
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+ persistShardList();
+ }
+
private void onGetSnapshot() {
LOG.debug("{}: onGetSnapshot", persistenceId());
}
byte[] shardManagerSnapshot = null;
+ if(currentSnapshot != null) {
+ shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
+ }
+
ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
}
private void onCreateShard(CreateShard createShard) {
+ LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
+
Object reply;
try {
String shardName = createShard.getModuleShardConfig().getShardName();
if(localShards.containsKey(shardName)) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
reply = new akka.actor.Status.Success(null);
}
} catch (Exception e) {
- LOG.error("onCreateShard failed", e);
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
reply = new akka.actor.Status.Failure(e);
}
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+ boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
+ currentSnapshot.getShardList().contains(shardName);
+
Map<String, String> peerAddresses;
boolean isActiveMember;
- if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+ if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
+ contains(cluster.getCurrentMemberName())) {
peerAddresses = getPeerAddresses(shardName);
isActiveMember = true;
} else {
- // The local member is not in the given shard member configuration. In this case we'll create
+ // The local member is not in the static shard member configuration and the shard did not
+ // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
// the shard with no peers and with elections disabled so it stays as follower. A
// subsequent AddServer request will be needed to make it an active member.
isActiveMember = false;
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
}
- LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
- moduleShardConfig.getShardMemberNames(), peerAddresses);
+ LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
+ isActiveMember);
ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
@Override
protected void handleRecover(Object message) throws Exception {
if (message instanceof RecoveryCompleted) {
- LOG.info("Recovery complete : {}", persistenceId());
-
- // We no longer persist SchemaContext modules so delete all the prior messages from the akka
- // journal on upgrade from Helium.
- deleteMessages(lastSequenceNr());
- createLocalShards();
+ onRecoveryCompleted();
} else if (message instanceof SnapshotOffer) {
- handleShardRecovery((SnapshotOffer) message);
+ applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
}
}
+ private void onRecoveryCompleted() {
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // We no longer persist SchemaContext modules so delete all the prior messages from the akka
+ // journal on upgrade from Helium.
+ deleteMessages(lastSequenceNr());
+
+ if(currentSnapshot == null && restoreFromSnapshot != null &&
+ restoreFromSnapshot.getShardManagerSnapshot() != null) {
+ try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
+ restoreFromSnapshot.getShardManagerSnapshot()))) {
+ ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+
+ LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+
+ applyShardManagerSnapshot(snapshot);
+ } catch(Exception e) {
+ LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
+ }
+ }
+
+ createLocalShards();
+ }
+
private void findLocalShard(FindLocalShard message) {
final ShardInformation shardInformation = localShards.get(message.getShardName());
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+
+ LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
+
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
return false;
}
- private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
+ private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
return;
}
- Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
- getShardInitializationTimeout().duration().$times(2));
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+
+ }
- final ActorRef sender = getSender();
- Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
- futureObj.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) {
- if (failure != null) {
- LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
- } else {
- if(response instanceof RemotePrimaryShardFound) {
- self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
- (RemotePrimaryShardFound)response), sender);
- } else if(response instanceof LocalPrimaryShardFound) {
- sendLocalReplicaAlreadyExistsReply(shardName, sender);
- } else {
- String msg = String.format("Failed to find leader for shard %s: received response: %s",
- shardName, response);
- LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
- new RuntimeException(msg)), getSelf());
- }
- }
+ public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
}
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+
+ });
}
private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
}
}
LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
- saveSnapshot(new ShardManagerSnapshot(shardList));
+ saveSnapshot(updateShardManagerSnapshot(shardList));
}
- private void handleShardRecovery(SnapshotOffer offer) {
- LOG.debug ("{}: in handleShardRecovery", persistenceId());
- ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
+ currentSnapshot = new ShardManagerSnapshot(shardList);
+ return currentSnapshot;
+ }
+
+ private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+ currentSnapshot = snapshot;
+
+ LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+
String currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
- for (String shard : snapshot.getShardList()) {
+ for (String shard : currentSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
LOG.debug ("{}: adding shard {}", persistenceId(), shard);
}
}
- private static class ForwardedAddServerPrimaryShardFound {
- String shardName;
- RemotePrimaryShardFound primaryFound;
-
- ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
- this.shardName = shardName;
- this.primaryFound = primaryFound;
- }
+ private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+ LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ persistenceId());
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
}
private static class ForwardedAddServerReply {
return shardName;
}
+ @Nullable
ActorRef getActor(){
return actor;
}
return new Builder();
}
- public static class Builder {
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
private ClusterWrapper cluster;
private Configuration configuration;
private DatastoreContextFactory datastoreContextFactory;
private DatastoreSnapshot restoreFromSnapshot;
private volatile boolean sealed;
+ @SuppressWarnings("unchecked")
+ private T self() {
+ return (T) this;
+ }
+
protected void checkSealed() {
Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
}
- public Builder cluster(ClusterWrapper cluster) {
+ public T cluster(ClusterWrapper cluster) {
checkSealed();
this.cluster = cluster;
- return this;
+ return self();
}
- public Builder configuration(Configuration configuration) {
+ public T configuration(Configuration configuration) {
checkSealed();
this.configuration = configuration;
- return this;
+ return self();
}
- public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+ public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
checkSealed();
this.datastoreContextFactory = datastoreContextFactory;
- return this;
+ return self();
}
- public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+ public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
checkSealed();
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
- return this;
+ return self();
}
- public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
checkSealed();
this.primaryShardInfoCache = primaryShardInfoCache;
- return this;
+ return self();
}
- public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+ public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
checkSealed();
this.restoreFromSnapshot = restoreFromSnapshot;
- return this;
+ return self();
}
protected void verify() {
return Props.create(ShardManager.class, this);
}
}
+
+ public static class Builder extends AbstractBuilder<Builder> {
+ }
+
+ private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
+
+
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ handler.onFailure(failure);
+ } else {
+ if(response instanceof RemotePrimaryShardFound) {
+ handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+ } else {
+ handler.onUnknownResponse(response);
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ /**
+ * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
+ * a remote or local find primary message is processed
+ */
+ private static interface FindPrimaryResponseHandler {
+ /**
+ * Invoked when a Failure message is received as a response
+ *
+ * @param failure
+ */
+ void onFailure(Throwable failure);
+
+ /**
+ * Invoked when a RemotePrimaryShardFound response is received
+ *
+ * @param response
+ */
+ void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
+
+ /**
+ * Invoked when a LocalPrimaryShardFound response is received
+ * @param response
+ */
+ void onLocalPrimaryFound(LocalPrimaryShardFound response);
+
+ /**
+ * Invoked when an unknown response is received. This is another type of failure.
+ *
+ * @param response
+ */
+ void onUnknownResponse(Object response);
+ }
+
+ /**
+ * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
+ * replica and sends a wrapped Failure response to some targetActor
+ */
+ private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+ private final ActorRef targetActor;
+ private final String shardName;
+ private final String persistenceId;
+ private final ActorRef shardManagerActor;
+
+ /**
+ * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
+ * @param shardName The name of the shard for which the primary replica had to be found
+ * @param persistenceId The persistenceId for the ShardManager
+ * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
+ */
+ protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+ this.targetActor = Preconditions.checkNotNull(targetActor);
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+ }
+
+ public ActorRef getTargetActor() {
+ return targetActor;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public ActorRef getShardManagerActor() {
+ return shardManagerActor;
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+ targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
+ }
+
+ @Override
+ public void onUnknownResponse(Object response) {
+ String msg = String.format("Failed to find leader for shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), shardManagerActor);
+ }
+ }
+
+
+ /**
+ * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
+ * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
+ * as a successful response to find primary.
+ */
+ private static class PrimaryShardFoundForContext {
+ private final String shardName;
+ private final Object contextMessage;
+ private final RemotePrimaryShardFound remotePrimaryShardFound;
+ private final LocalPrimaryShardFound localPrimaryShardFound;
+
+ public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.contextMessage = Preconditions.checkNotNull(contextMessage);
+ Preconditions.checkNotNull(primaryFoundMessage);
+ this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
+ this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
+ }
+
+ @Nonnull
+ public String getPrimaryPath(){
+ if(remotePrimaryShardFound != null){
+ return remotePrimaryShardFound.getPrimaryPath();
+ }
+ return localPrimaryShardFound.getPrimaryPath();
+ }
+
+ @Nonnull
+ public Object getContextMessage() {
+ return contextMessage;
+ }
+
+ @Nullable
+ public RemotePrimaryShardFound getRemotePrimaryShardFound(){
+ return remotePrimaryShardFound;
+ }
+
+ @Nullable
+ public LocalPrimaryShardFound getLocalPrimaryShardFound(){
+ return localPrimaryShardFound;
+ }
+
+ boolean isPrimaryLocal(){
+ return (remotePrimaryShardFound == null);
+ }
+
+ @Nonnull
+ public String getShardName() {
+ return shardName;
+ }
+ }
}