import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.pattern.Patterns;
+import akka.persistence.DeleteSnapshotsFailure;
+import akka.persistence.DeleteSnapshotsSuccess;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+ private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
+
private final String persistenceId;
private final AbstractDataStore dataStore;
onGetLocalShardIds();
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
+ } else if (message instanceof DeleteSnapshotsFailure) {
+ LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
+ ((DeleteSnapshotsFailure) message).cause());
+ } else if (message instanceof DeleteSnapshotsSuccess) {
+ LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message);
+ } else if (message instanceof RegisterRoleChangeListenerReply) {
+ LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
+ } else if (message instanceof ClusterEvent.MemberEvent) {
+ LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
} else {
unknownMessage(message);
}
}
private void onShardReplicaRemoved(ServerRemoved message) {
- final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
- final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+ removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void removeShard(final ShardIdentifier shardId) {
+ final String shardName = shardId.getShardName();
+ final ShardInformation shardInformation = localShards.remove(shardName);
if (shardInformation == null) {
LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
return;
- } else if (shardInformation.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
- shardInformation.getActor().tell(Shutdown.INSTANCE, self());
}
- LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+
+ final ActorRef shardActor = shardInformation.getActor();
+ if (shardActor != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
+ FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3);
+ final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
+ shardActorStoppingFutures.put(shardName, stopFuture);
+ stopFuture.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable failure, Boolean result) {
+ if (failure == null) {
+ LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
+ } else {
+ LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
+ }
+
+ self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
+ ActorRef.noSender());
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
persistShardList();
}
LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
final PrefixShardConfiguration config = message.getConfiguration();
-
final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
final String shardName = shardId.getShardName();
+ if (isPreviousShardActorStopInProgress(shardName, message)) {
+ return;
+ }
+
if (localShards.containsKey(shardName)) {
LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
final PrefixShardConfiguration existing =
doCreatePrefixShard(config, shardId, shardName);
}
+ private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
+ final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
+ if (stopFuture == null) {
+ return false;
+ }
+
+ LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
+ shardName, messageToDefer);
+ final ActorRef sender = getSender();
+ stopFuture.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable failure, Boolean result) {
+ LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+ self().tell(messageToDefer, sender);
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+
+ return true;
+ }
+
private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
configuration.addPrefixShardConfiguration(config);
.storeRoot(config.getPrefix().getRootIdentifier());
DatastoreContext shardDatastoreContext = builder.build();
- final Map<String, String> peerAddresses = Collections.emptyMap();
+ final Map<String, String> peerAddresses = getPeerAddresses(shardName);
final boolean isActiveMember = true;
LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
final DOMDataTreeIdentifier prefix = message.getPrefix();
final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
- final ShardInformation shard = localShards.remove(shardId.getShardName());
configuration.removePrefixShardConfiguration(prefix);
-
- if (shard == null) {
- LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix);
- return;
- }
-
- if (shard.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
- shard.getActor().tell(Shutdown.INSTANCE, self());
- }
-
- LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
- persistShardList();
+ removeShard(shardId);
}
private void doCreateShard(final CreateShard createShard) {
private void onRecoveryCompleted() {
LOG.info("Recovery complete : {}", persistenceId());
- // We no longer persist SchemaContext modules so delete all the prior messages from the akka
- // journal on upgrade from Helium.
- deleteMessages(lastSequenceNr());
-
if (currentSnapshot == null && restoreFromSnapshot != null
&& restoreFromSnapshot.getShardManagerSnapshot() != null) {
ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
}
@VisibleForTesting
- protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
- return getContext().actorOf(info.newProps(schemaContext)
- .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+ protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
+ return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
+ info.getShardId().toString());
}
private void findPrimary(FindPrimary message) {
getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(),
- response, getSender()), getTargetActor());
+ final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
+ message.getShardPrefix(), response, getSender());
+ if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+ getSelf().tell(runnable, getTargetActor());
+ }
}
@Override
getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
- getTargetActor());
+ final RunnableMessage runnable = (RunnableMessage) () ->
+ addShard(getShardName(), response, getSender());
+ if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+ getSelf().tell(runnable, getTargetActor());
+ }
}
@Override
public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
}
-
});
}