Implement graceful ShardManager shutdown
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index a91109c64b3fa5aac674677b95d2a31c83477fbd..c39c80021c456b20565bb214da63ee52e515f29d 100644 (file)
@@ -19,8 +19,10 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
+import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
@@ -89,6 +91,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
@@ -100,6 +103,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -184,7 +188,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void postStop() {
-        LOG.info("Stopping ShardManager");
+        LOG.info("Stopping ShardManager {}", persistenceId());
 
         mBean.unregisterMBean();
     }
@@ -243,16 +247,60 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
-        } else if (message instanceof SaveSnapshotSuccess) {
+        } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
-        } else if (message instanceof SaveSnapshotFailure) {
+        } else if(message instanceof SaveSnapshotFailure) {
             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
+        } else if(message instanceof Shutdown) {
+            onShutDown();
         } else {
             unknownMessage(message);
         }
     }
 
+    private void onShutDown() {
+        Shutdown shutdown = new Shutdown();
+        List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+
+                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+                stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, shutdown));
+            }
+        }
+
+        LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+
+        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+        Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
+
+        combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<Boolean> results) {
+                LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+
+                self().tell(PoisonPill.getInstance(), self());
+
+                if(failure != null) {
+                    LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+                } else {
+                    int nfailed = 0;
+                    for(Boolean r: results) {
+                        if(!r) {
+                            nfailed++;
+                        }
+                    }
+
+                    if(nfailed > 0) {
+                        LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+                    }
+                }
+            }
+        }, dispatcher);
+    }
+
     private void onWrappedShardResponse(WrappedShardResponse message) {
         if (message.getResponse() instanceof RemoveServerReply) {
             onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());