From: Tom Pantelis Date: Tue, 29 Dec 2015 15:11:19 +0000 (-0500) Subject: Implement graceful ShardManager shutdown X-Git-Tag: release/beryllium~43 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=72a72ae745544e41603f851bf4f47087cfe521ba Implement graceful ShardManager shutdown Implemented the ShutDown message in ShardManager to issue a graceful stop with a Shutdown message to each shard. When the stop futures complete, the ShardManager sends a PoisionPill to itself. The ActorContext now issues a graceful stop to the ShardManager with a Shutdown message. Change-Id: I13e17cc06d44f524a469cea7acfa45c3ba5e52bd Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 4fd707e065..5e8b1913c4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -193,6 +193,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, @Override public void close() { + LOG.info("Closing data store {}", type); + if (datastoreConfigMXBean != null) { datastoreConfigMXBean.unregisterMBean(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index a91109c64b..c39c80021c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -19,8 +19,10 @@ import akka.actor.Props; import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; import akka.japi.Function; +import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; @@ -89,6 +91,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; @@ -100,6 +103,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -184,7 +188,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void postStop() { - LOG.info("Stopping ShardManager"); + LOG.info("Stopping ShardManager {}", persistenceId()); mBean.unregisterMBean(); } @@ -243,16 +247,60 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetSnapshot(); } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); - } else if (message instanceof SaveSnapshotSuccess) { + } else if(message instanceof SaveSnapshotSuccess) { onSaveSnapshotSuccess((SaveSnapshotSuccess)message); - } else if (message instanceof SaveSnapshotFailure) { + } else if(message instanceof SaveSnapshotFailure) { LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), ((SaveSnapshotFailure) message).cause()); + } else if(message instanceof Shutdown) { + onShutDown(); } else { unknownMessage(message); } } + private void onShutDown() { + Shutdown shutdown = new Shutdown(); + List> stopFutures = new ArrayList<>(localShards.size()); + for (ShardInformation info : localShards.values()) { + if (info.getActor() != null) { + LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId()); + + FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2); + stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, shutdown)); + } + } + + LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size()); + + ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client); + Future> combinedFutures = Futures.sequence(stopFutures, dispatcher); + + combinedFutures.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable results) { + LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId()); + + self().tell(PoisonPill.getInstance(), self()); + + if(failure != null) { + LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure); + } else { + int nfailed = 0; + for(Boolean r: results) { + if(!r) { + nfailed++; + } + } + + if(nfailed > 0) { + LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed); + } + } + } + }, dispatcher); + } + private void onWrappedShardResponse(WrappedShardResponse message) { if (message.getResponse() instanceof RemoveServerReply) { onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 42deda8227..1dc49dc88a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -14,10 +14,10 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; -import akka.actor.PoisonPill; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.util.Timeout; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; @@ -45,6 +45,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.reporting.MetricsReporter; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -377,7 +378,12 @@ public class ActorContext { } public void shutdown() { - shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3); + try { + Await.ready(Patterns.gracefulStop(shardManager, duration, new Shutdown()), duration); + } catch(Exception e) { + LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e); + } } public ClusterWrapper getClusterWrapper() { @@ -502,6 +508,7 @@ public class ActorContext { * @return * @deprecated Use {@link #getDataStoreName()} instead. */ + @Deprecated public String getDataStoreType() { return datastoreContext.getDataStoreName(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index e32e9385f4..395ac223e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -105,6 +106,7 @@ import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; @@ -1911,6 +1913,54 @@ public class ShardManagerTest extends AbstractActorTest { LOG.info("testShardPersistenceWithRestoredData ending"); } + @Test + public void testShutDown() throws Exception { + LOG.info("testShutDown starting"); + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("shard1", Arrays.asList("member-1")). + put("shard2", Arrays.asList("member-1")).build()); + + String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1"). + type(shardMrgIDSuffix).build().toString(); + TestActorRef shard1 = actorFactory.createTestActor( + MessageCollectorActor.props(), shardId1); + + String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1"). + type(shardMrgIDSuffix).build().toString(); + TestActorRef shard2 = actorFactory.createTestActor( + MessageCollectorActor.props(), shardId2); + + TestActorRef shardManager = actorFactory.createTestActor(newTestShardMgrBuilder( + mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), shard1); + shardManager.tell(new ActorInitialized(), shard2); + + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future stopFuture = Patterns.gracefulStop(shardManager, duration, new Shutdown()); + + MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class); + MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class); + + try { + Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS)); + fail("ShardManager actor stopped without waiting for the Shards to be stopped"); + } catch(TimeoutException e) { + // expected + } + + actorFactory.killActor(shard1, this); + actorFactory.killActor(shard2, this); + + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); + }}; + + LOG.info("testShutDown ending"); + } private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java index 8be62c482e..bdfdfc2832 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java @@ -36,7 +36,7 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext private final DOMRpcProviderService rpcProvisionRegistry; private ListenerRegistration schemaListenerRegistration; - private ActorSystem actorSystem; + private final ActorSystem actorSystem; private Broker.ProviderSession brokerSession; private SchemaContext schemaContext; private ActorRef rpcManager; @@ -53,10 +53,6 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext @Override public void close() throws Exception { - if (actorSystem != null) { - actorSystem.shutdown(); - actorSystem = null; - } if (schemaListenerRegistration != null) { schemaListenerRegistration.close(); schemaListenerRegistration = null;