X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=285dbd71f169bb9ec93f82d3c3aa4a9afc35f23d;hp=d3d8ce39c9a108f0b497b942f9e6e5368f3c6a21;hb=6ef8a6b4e403d5908e7090a5bd387f81c10c91c6;hpb=e345c2a17f737d537cda45b0f737dff417e3b359 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index d3d8ce39c9..285dbd71f1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -85,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -167,7 +168,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set shardReplicaOperationsInProgress = new HashSet<>(); - private final Map> shardActorStoppingFutures = new HashMap<>(); + private final Map> shardActorsStopping = new HashMap<>(); private final String persistenceId; private final AbstractDataStore dataStore; @@ -490,24 +491,34 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ActorRef shardActor = shardInformation.getActor(); if (shardActor != null) { - LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor); - FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig() - .getElectionTimeOutInterval().$times(3); - final Future stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE); - shardActorStoppingFutures.put(shardName, stopFuture); - stopFuture.onComplete(new OnComplete() { + long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig() + .getElectionTimeOutInterval().$times(3).toMillis(), 10000); + + LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor, + timeoutInMS); + + final Future stopFuture = Patterns.gracefulStop(shardActor, + FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE); + + final CompositeOnComplete onComplete = new CompositeOnComplete() { @Override - public void onComplete(Throwable failure, Boolean result) { + public void onComplete(final Throwable failure, final Boolean result) { if (failure == null) { LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor); } else { LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure); } - self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName), - ActorRef.noSender()); + self().tell((RunnableMessage) () -> { + shardActorsStopping.remove(shardName); + notifyOnCompleteTasks(failure, result); + }, ActorRef.noSender()); } - }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + }; + + shardActorsStopping.put(shardName, onComplete); + stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers()) + .getDispatcher(Dispatchers.DispatcherType.Client)); } LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName); @@ -594,21 +605,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { - final Future stopFuture = shardActorStoppingFutures.get(shardName); - if (stopFuture == null) { + final CompositeOnComplete stopOnComplete = shardActorsStopping.get(shardName); + if (stopOnComplete == null) { return false; } - LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(), + LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(), shardName, messageToDefer); final ActorRef sender = getSender(); - stopFuture.onComplete(new OnComplete() { + stopOnComplete.addOnComplete(new OnComplete() { @Override public void onComplete(Throwable failure, Boolean result) { LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer); self().tell(messageToDefer, sender); } - }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + }); return true; }