package org.opendaylight.controller.cluster.datastore;
import static akka.pattern.Patterns.ask;
-import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
+import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
+import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
@Override
public void postStop() {
- LOG.info("Stopping ShardManager");
+ LOG.info("Stopping ShardManager {}", persistenceId());
mBean.unregisterMBean();
}
} else if(message instanceof PrimaryShardFoundForContext) {
PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
onPrimaryShardFoundContext(primaryShardFoundContext);
- } else if(message instanceof RemoveShardReplica){
- onRemoveShardReplica((RemoveShardReplica)message);
+ } else if(message instanceof RemoveShardReplica) {
+ onRemoveShardReplica((RemoveShardReplica) message);
+ } else if(message instanceof WrappedShardResponse){
+ onWrappedShardResponse((WrappedShardResponse) message);
} else if(message instanceof GetSnapshot) {
onGetSnapshot();
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
- } else if (message instanceof SaveSnapshotSuccess) {
+ } else if(message instanceof SaveSnapshotSuccess) {
onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
- } else if (message instanceof SaveSnapshotFailure) {
+ } else if(message instanceof SaveSnapshotFailure) {
LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure) message).cause());
+ } else if(message instanceof Shutdown) {
+ onShutDown();
} else {
unknownMessage(message);
}
}
+ private void onShutDown() {
+ List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() != null) {
+ LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+
+ FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+ stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
+ }
+ }
+
+ LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+
+ ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+ Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
+
+ combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Boolean> results) {
+ LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+
+ self().tell(PoisonPill.getInstance(), self());
+
+ if(failure != null) {
+ LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+ } else {
+ int nfailed = 0;
+ for(Boolean r: results) {
+ if(!r) {
+ nfailed++;
+ }
+ }
+
+ if(nfailed > 0) {
+ LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+ }
+ }
+ }
+ }, dispatcher);
+ }
+
+ private void onWrappedShardResponse(WrappedShardResponse message) {
+ if (message.getResponse() instanceof RemoveServerReply) {
+ onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
+ message.getLeaderPath());
+ }
+ }
+
+ private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
+ String leaderPath) {
+ shardReplicaOperationsInProgress.remove(shardId);
+
+ LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ shardId.getShardName());
+ originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else {
+ LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+ persistenceId(), shardId, replyMsg.getStatus());
+
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
+ leaderPath, shardId);
+ originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+ }
+ }
+
private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
+ addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
+ getSender());
+ } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
+ removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
+ primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
}
}
+ private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
+ final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
+ duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+ primaryPath, shardName);
+
+ LOG.debug ("{}: {}", persistenceId(), msg, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
private void onShardReplicaRemoved(ServerRemoved message) {
final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
return;
} else if(shardInformation.getActor() != null) {
- LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
- shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
+ shardInformation.getActor().tell(Shutdown.INSTANCE, self());
}
LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
persistShardList();
}
private void memberRemoved(ClusterEvent.MemberRemoved message) {
- String memberName = message.member().roles().head();
+ String memberName = message.member().roles().iterator().next();
LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberExited(ClusterEvent.MemberExited message) {
- String memberName = message.member().roles().head();
+ String memberName = message.member().roles().iterator().next();
LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberUp(ClusterEvent.MemberUp message) {
- String memberName = message.member().roles().head();
+ String memberName = message.member().roles().iterator().next();
LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberReachable(ClusterEvent.ReachableMember message) {
- String memberName = message.member().roles().head();
+ String memberName = message.member().roles().iterator().next();
LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
addPeerAddress(memberName, message.member().address());
}
private void memberUnreachable(ClusterEvent.UnreachableMember message) {
- String memberName = message.member().roles().head();
+ String memberName = message.member().roles().iterator().next();
LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
markMemberUnavailable(memberName);
continue;
}
- LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
- shardName, address);
+ LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
+ persistenceId(), shardName, address, visitedAddresses);
getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
message.isWaitUntilReady(), visitedAddresses), getContext());
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
-
}
@Override
LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
persistenceId(), shardName, replyMsg.getStatus());
- Exception failure;
- switch (replyMsg.getStatus()) {
- case TIMEOUT:
- failure = new TimeoutException(String.format(
- "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
- "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
- leaderPath, shardName));
- break;
- case NO_LEADER:
- failure = createNoShardLeaderException(shardInfo.getShardId());
- break;
- default :
- failure = new RuntimeException(String.format(
- "AddServer request to leader %s for shard %s failed with status %s",
- leaderPath, shardName, replyMsg.getStatus()));
- }
+ Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
}
}
- private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
- String shardName = shardReplicaMsg.getShardName();
-
- // verify the local shard replica is available in the controller node
- if (!localShards.containsKey(shardName)) {
- String msg = String.format("Local shard %s does not", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
- return;
+ private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
+ String leaderPath, ShardIdentifier shardId) {
+ Exception failure;
+ switch (serverChangeStatus) {
+ case TIMEOUT:
+ failure = new TimeoutException(String.format(
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+ "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardId.getShardName()));
+ break;
+ case NO_LEADER:
+ failure = createNoShardLeaderException(shardId);
+ break;
+ case NOT_SUPPORTED:
+ failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+ serverChange.getSimpleName(), shardId.getShardName()));
+ break;
+ default :
+ failure = new RuntimeException(String.format(
+ "%s request to leader %s for shard %s failed with status %s",
+ serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
}
- // call RemoveShard for the shardName
- getSender().tell(new akka.actor.Status.Success(true), getSelf());
- return;
+ return failure;
+ }
+
+ private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+ LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
+ shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ }
+ });
}
private void persistShardList() {
private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
persistenceId());
- deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
+ 0, 0));
}
private static class ForwardedAddServerReply {
private final ShardIdentifier shardId;
private final String shardName;
private ActorRef actor;
- private ActorPath actorPath;
private final Map<String, String> initialPeerAddresses;
private Optional<DataTree> localShardDataTree;
private boolean leaderAvailable = false;
return actor;
}
- ActorPath getActorPath() {
- return actorPath;
- }
-
void setActor(ActorRef actor) {
this.actor = actor;
- this.actorPath = actor.path();
}
ShardIdentifier getShardId() {
void setLeaderAvailable(boolean leaderAvailable) {
this.leaderAvailable = leaderAvailable;
+
+ if(leaderAvailable) {
+ notifyOnShardInitializedCallbacks();
+ }
}
short getLeaderVersion() {
return shardName;
}
- public ActorRef getShardManagerActor() {
- return shardManagerActor;
- }
-
@Override
public void onFailure(Throwable failure) {
LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
private final RemotePrimaryShardFound remotePrimaryShardFound;
private final LocalPrimaryShardFound localPrimaryShardFound;
- public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
+ public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
+ @Nonnull Object primaryFoundMessage) {
this.shardName = Preconditions.checkNotNull(shardName);
this.contextMessage = Preconditions.checkNotNull(contextMessage);
Preconditions.checkNotNull(primaryFoundMessage);
- this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
- this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
+ this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
+ (RemotePrimaryShardFound) primaryFoundMessage : null;
+ this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
+ (LocalPrimaryShardFound) primaryFoundMessage : null;
}
@Nonnull
- public String getPrimaryPath(){
- if(remotePrimaryShardFound != null){
+ String getPrimaryPath(){
+ if(remotePrimaryShardFound != null) {
return remotePrimaryShardFound.getPrimaryPath();
}
return localPrimaryShardFound.getPrimaryPath();
}
@Nonnull
- public Object getContextMessage() {
+ Object getContextMessage() {
return contextMessage;
}
@Nullable
- public RemotePrimaryShardFound getRemotePrimaryShardFound(){
+ RemotePrimaryShardFound getRemotePrimaryShardFound() {
return remotePrimaryShardFound;
}
- @Nullable
- public LocalPrimaryShardFound getLocalPrimaryShardFound(){
- return localPrimaryShardFound;
+ @Nonnull
+ String getShardName() {
+ return shardName;
+ }
+ }
+
+ /**
+ * The WrappedShardResponse class wraps a response from a Shard.
+ */
+ private static class WrappedShardResponse {
+ private final ShardIdentifier shardId;
+ private final Object response;
+ private final String leaderPath;
+
+ private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+ this.shardId = shardId;
+ this.response = response;
+ this.leaderPath = leaderPath;
+ }
+
+ ShardIdentifier getShardId() {
+ return shardId;
}
- boolean isPrimaryLocal(){
- return (remotePrimaryShardFound == null);
+ Object getResponse() {
+ return response;
}
- @Nonnull
- public String getShardName() {
- return shardName;
+ String getLeaderPath() {
+ return leaderPath;
}
}
}