Implement graceful ShardManager shutdown 52/31952/8
authorTom Pantelis <tpanteli@brocade.com>
Tue, 29 Dec 2015 15:11:19 +0000 (10:11 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 8 Jan 2016 02:24:50 +0000 (02:24 +0000)
Implemented the ShutDown message in ShardManager to issue a graceful stop
with a Shutdown message to each shard. When the stop futures complete,
the ShardManager sends a PoisionPill to itself.

The ActorContext now issues a graceful stop to the ShardManager with a
Shutdown message.

Change-Id: I13e17cc06d44f524a469cea7acfa45c3ba5e52bd
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java

index 4fd707e..5e8b191 100644 (file)
@@ -193,6 +193,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
     @Override
     public void close() {
+        LOG.info("Closing data store {}", type);
+
         if (datastoreConfigMXBean != null) {
             datastoreConfigMXBean.unregisterMBean();
         }
index a91109c..c39c800 100644 (file)
@@ -19,8 +19,10 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
+import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
@@ -89,6 +91,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
@@ -100,6 +103,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -184,7 +188,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void postStop() {
-        LOG.info("Stopping ShardManager");
+        LOG.info("Stopping ShardManager {}", persistenceId());
 
         mBean.unregisterMBean();
     }
@@ -243,16 +247,60 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
-        } else if (message instanceof SaveSnapshotSuccess) {
+        } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
-        } else if (message instanceof SaveSnapshotFailure) {
+        } else if(message instanceof SaveSnapshotFailure) {
             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
+        } else if(message instanceof Shutdown) {
+            onShutDown();
         } else {
             unknownMessage(message);
         }
     }
 
+    private void onShutDown() {
+        Shutdown shutdown = new Shutdown();
+        List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+
+                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+                stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, shutdown));
+            }
+        }
+
+        LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+
+        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+        Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
+
+        combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<Boolean> results) {
+                LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+
+                self().tell(PoisonPill.getInstance(), self());
+
+                if(failure != null) {
+                    LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+                } else {
+                    int nfailed = 0;
+                    for(Boolean r: results) {
+                        if(!r) {
+                            nfailed++;
+                        }
+                    }
+
+                    if(nfailed > 0) {
+                        LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+                    }
+                }
+            }
+        }, dispatcher);
+    }
+
     private void onWrappedShardResponse(WrappedShardResponse message) {
         if (message.getResponse() instanceof RemoveServerReply) {
             onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
index 42deda8..1dc49dc 100644 (file)
@@ -14,10 +14,10 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
-import akka.actor.PoisonPill;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
@@ -45,6 +45,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -377,7 +378,12 @@ public class ActorContext {
     }
 
     public void shutdown() {
-        shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
+        try {
+            Await.ready(Patterns.gracefulStop(shardManager, duration, new Shutdown()), duration);
+        } catch(Exception e) {
+            LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
+        }
     }
 
     public ClusterWrapper getClusterWrapper() {
@@ -502,6 +508,7 @@ public class ActorContext {
      * @return
      * @deprecated Use {@link #getDataStoreName()} instead.
      */
+    @Deprecated
     public String getDataStoreType() {
         return datastoreContext.getDataStoreName();
     }
index e32e938..395ac22 100644 (file)
@@ -13,6 +13,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -105,6 +106,7 @@ import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
@@ -1911,6 +1913,54 @@ public class ShardManagerTest extends AbstractActorTest {
         LOG.info("testShardPersistenceWithRestoredData ending");
     }
 
+    @Test
+    public void testShutDown() throws Exception {
+        LOG.info("testShutDown starting");
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                            put("shard1", Arrays.asList("member-1")).
+                            put("shard2", Arrays.asList("member-1")).build());
+
+            String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
+                    type(shardMrgIDSuffix).build().toString();
+            TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
+                    MessageCollectorActor.props(), shardId1);
+
+            String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
+                    type(shardMrgIDSuffix).build().toString();
+            TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
+                    MessageCollectorActor.props(), shardId2);
+
+            TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
+                    mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), shard1);
+            shardManager.tell(new ActorInitialized(), shard2);
+
+            FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+            Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, new Shutdown());
+
+            MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
+            MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
+
+            try {
+                Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
+                fail("ShardManager actor stopped without waiting for the Shards to be stopped");
+            } catch(TimeoutException e) {
+                // expected
+            }
+
+            actorFactory.killActor(shard1, this);
+            actorFactory.killActor(shard2, this);
+
+            Boolean stopped = Await.result(stopFuture, duration);
+            assertEquals("Stopped", Boolean.TRUE, stopped);
+        }};
+
+        LOG.info("testShutDown ending");
+    }
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
index 8be62c4..bdfdfc2 100644 (file)
@@ -36,7 +36,7 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
   private final DOMRpcProviderService rpcProvisionRegistry;
 
   private ListenerRegistration<SchemaContextListener> schemaListenerRegistration;
-  private ActorSystem actorSystem;
+  private final ActorSystem actorSystem;
   private Broker.ProviderSession brokerSession;
   private SchemaContext schemaContext;
   private ActorRef rpcManager;
@@ -53,10 +53,6 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
 
   @Override
   public void close() throws Exception {
-    if (actorSystem != null) {
-        actorSystem.shutdown();
-        actorSystem = null;
-    }
     if (schemaListenerRegistration != null) {
         schemaListenerRegistration.close();
         schemaListenerRegistration = null;