X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=b6833f3004a18b1a6aca864e0ff0d4fa26348239;hb=ffe27dc93dc6f6a190287164f10444f4f6838d59;hp=a91109c64b3fa5aac674677b95d2a31c83477fbd;hpb=d4d59200f8c56551755c36fbbd2b4aa52defa5cb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index a91109c64b..b6833f3004 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -19,8 +19,10 @@ import akka.actor.Props; import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; import akka.japi.Function; +import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; @@ -89,6 +91,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; @@ -100,6 +103,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -184,7 +188,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void postStop() { - LOG.info("Stopping ShardManager"); + LOG.info("Stopping ShardManager {}", persistenceId()); mBean.unregisterMBean(); } @@ -243,25 +247,85 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetSnapshot(); } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); - } else if (message instanceof SaveSnapshotSuccess) { + } else if(message instanceof SaveSnapshotSuccess) { onSaveSnapshotSuccess((SaveSnapshotSuccess)message); - } else if (message instanceof SaveSnapshotFailure) { + } else if(message instanceof SaveSnapshotFailure) { LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), ((SaveSnapshotFailure) message).cause()); + } else if(message instanceof Shutdown) { + onShutDown(); } else { unknownMessage(message); } } + private void onShutDown() { + Shutdown shutdown = new Shutdown(); + List> stopFutures = new ArrayList<>(localShards.size()); + for (ShardInformation info : localShards.values()) { + if (info.getActor() != null) { + LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId()); + + FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2); + stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, shutdown)); + } + } + + LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size()); + + ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client); + Future> combinedFutures = Futures.sequence(stopFutures, dispatcher); + + combinedFutures.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable results) { + LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId()); + + self().tell(PoisonPill.getInstance(), self()); + + if(failure != null) { + LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure); + } else { + int nfailed = 0; + for(Boolean r: results) { + if(!r) { + nfailed++; + } + } + + if(nfailed > 0) { + LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed); + } + } + } + }, dispatcher); + } + private void onWrappedShardResponse(WrappedShardResponse message) { if (message.getResponse() instanceof RemoveServerReply) { - onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse()); + onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(), + message.getLeaderPath()); } } - private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) { - shardReplicaOperationsInProgress.remove(shardName); - originalSender.tell(new Status.Success(null), self()); + private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, + String leaderPath) { + shardReplicaOperationsInProgress.remove(shardId); + + LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); + + if (replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(), + shardId.getShardName()); + originalSender.tell(new akka.actor.Status.Success(null), getSelf()); + } else { + LOG.warn ("{}: Leader failed to remove shard replica {} with status {}", + persistenceId(), shardId, replyMsg.getStatus()); + + Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), + leaderPath, shardId); + originalSender.tell(new akka.actor.Status.Failure(failure), getSelf()); + } } private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { @@ -308,7 +372,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); } else { // SUCCESS - self().tell(new WrappedShardResponse(shardName, response), sender); + self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -321,8 +385,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString()); return; } else if(shardInformation.getActor() != null) { - LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor()); - shardInformation.getActor().tell(PoisonPill.getInstance(), self()); + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor()); + shardInformation.getActor().tell(new Shutdown(), self()); } LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); persistShardList(); @@ -1786,21 +1850,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * The WrappedShardResponse class wraps a response from a Shard. */ private static class WrappedShardResponse { - private final String shardName; + private final ShardIdentifier shardId; private final Object response; + private final String leaderPath; - private WrappedShardResponse(String shardName, Object response) { - this.shardName = shardName; + private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { + this.shardId = shardId; this.response = response; + this.leaderPath = leaderPath; } - String getShardName() { - return shardName; + ShardIdentifier getShardId() { + return shardId; } Object getResponse() { return response; } + + String getLeaderPath() { + return leaderPath; + } } }