@Override
public void close() {
+ LOG.info("Closing data store {}", type);
+
if (datastoreConfigMXBean != null) {
datastoreConfigMXBean.unregisterMBean();
}
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
+import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
+import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
@Override
public void postStop() {
- LOG.info("Stopping ShardManager");
+ LOG.info("Stopping ShardManager {}", persistenceId());
mBean.unregisterMBean();
}
onGetSnapshot();
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
- } else if (message instanceof SaveSnapshotSuccess) {
+ } else if(message instanceof SaveSnapshotSuccess) {
onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
- } else if (message instanceof SaveSnapshotFailure) {
+ } else if(message instanceof SaveSnapshotFailure) {
LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure) message).cause());
+ } else if(message instanceof Shutdown) {
+ onShutDown();
} else {
unknownMessage(message);
}
}
+ private void onShutDown() {
+ Shutdown shutdown = new Shutdown();
+ List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() != null) {
+ LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+
+ FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+ stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, shutdown));
+ }
+ }
+
+ LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+
+ ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+ Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
+
+ combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Boolean> results) {
+ LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+
+ self().tell(PoisonPill.getInstance(), self());
+
+ if(failure != null) {
+ LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+ } else {
+ int nfailed = 0;
+ for(Boolean r: results) {
+ if(!r) {
+ nfailed++;
+ }
+ }
+
+ if(nfailed > 0) {
+ LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+ }
+ }
+ }
+ }, dispatcher);
+ }
+
private void onWrappedShardResponse(WrappedShardResponse message) {
if (message.getResponse() instanceof RemoveServerReply) {
onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Address;
-import akka.actor.PoisonPill;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
import akka.util.Timeout;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
}
public void shutdown() {
- shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
+ try {
+ Await.ready(Patterns.gracefulStop(shardManager, duration, new Shutdown()), duration);
+ } catch(Exception e) {
+ LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
+ }
}
public ClusterWrapper getClusterWrapper() {
* @return
* @deprecated Use {@link #getDataStoreName()} instead.
*/
+ @Deprecated
public String getDataStoreType() {
return datastoreContext.getDataStoreName();
}
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
LOG.info("testShardPersistenceWithRestoredData ending");
}
+ @Test
+ public void testShutDown() throws Exception {
+ LOG.info("testShutDown starting");
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("shard1", Arrays.asList("member-1")).
+ put("shard2", Arrays.asList("member-1")).build());
+
+ String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
+ type(shardMrgIDSuffix).build().toString();
+ TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
+ MessageCollectorActor.props(), shardId1);
+
+ String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
+ type(shardMrgIDSuffix).build().toString();
+ TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
+ MessageCollectorActor.props(), shardId2);
+
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
+ mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), shard1);
+ shardManager.tell(new ActorInitialized(), shard2);
+
+ FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+ Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, new Shutdown());
+
+ MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
+ MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
+
+ try {
+ Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
+ fail("ShardManager actor stopped without waiting for the Shards to be stopped");
+ } catch(TimeoutException e) {
+ // expected
+ }
+
+ actorFactory.killActor(shard1, this);
+ actorFactory.killActor(shard2, this);
+
+ Boolean stopped = Await.result(stopFuture, duration);
+ assertEquals("Stopped", Boolean.TRUE, stopped);
+ }};
+
+ LOG.info("testShutDown ending");
+ }
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final DOMRpcProviderService rpcProvisionRegistry;
private ListenerRegistration<SchemaContextListener> schemaListenerRegistration;
- private ActorSystem actorSystem;
+ private final ActorSystem actorSystem;
private Broker.ProviderSession brokerSession;
private SchemaContext schemaContext;
private ActorRef rpcManager;
@Override
public void close() throws Exception {
- if (actorSystem != null) {
- actorSystem.shutdown();
- actorSystem = null;
- }
if (schemaListenerRegistration != null) {
schemaListenerRegistration.close();
schemaListenerRegistration = null;