Bug 8385: Fix testMultipleRegistrationsAtOnePrefix failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index d3d8ce39c9a108f0b497b942f9e6e5368f3c6a21..285dbd71f169bb9ec93f82d3c3aa4a9afc35f23d 100644 (file)
@@ -85,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -167,7 +168,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
-    private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
+    private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
 
     private final String persistenceId;
     private final AbstractDataStore dataStore;
 
     private final String persistenceId;
     private final AbstractDataStore dataStore;
@@ -490,24 +491,34 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         final ActorRef shardActor = shardInformation.getActor();
         if (shardActor != null) {
 
         final ActorRef shardActor = shardInformation.getActor();
         if (shardActor != null) {
-            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
-            FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
-                    .getElectionTimeOutInterval().$times(3);
-            final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
-            shardActorStoppingFutures.put(shardName, stopFuture);
-            stopFuture.onComplete(new OnComplete<Boolean>() {
+            long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
+                    .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+
+            LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
+                    timeoutInMS);
+
+            final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
+                    FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
+
+            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
                 @Override
                 @Override
-                public void onComplete(Throwable failure, Boolean result) {
+                public void onComplete(final Throwable failure, final Boolean result) {
                     if (failure == null) {
                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
                     } else {
                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
                     }
 
                     if (failure == null) {
                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
                     } else {
                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
                     }
 
-                    self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
-                            ActorRef.noSender());
+                    self().tell((RunnableMessage) () -> {
+                        shardActorsStopping.remove(shardName);
+                        notifyOnCompleteTasks(failure, result);
+                    }, ActorRef.noSender());
                 }
                 }
-            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+            };
+
+            shardActorsStopping.put(shardName, onComplete);
+            stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
+                    .getDispatcher(Dispatchers.DispatcherType.Client));
         }
 
         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
         }
 
         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
@@ -594,21 +605,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
     }
 
     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
-        final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
-        if (stopFuture == null) {
+        final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
+        if (stopOnComplete == null) {
             return false;
         }
 
             return false;
         }
 
-        LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
+        LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
                 shardName, messageToDefer);
         final ActorRef sender = getSender();
                 shardName, messageToDefer);
         final ActorRef sender = getSender();
-        stopFuture.onComplete(new OnComplete<Boolean>() {
+        stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
             @Override
             public void onComplete(Throwable failure, Boolean result) {
                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
                 self().tell(messageToDefer, sender);
             }
             @Override
             public void onComplete(Throwable failure, Boolean result) {
                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
                 self().tell(messageToDefer, sender);
             }
-        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        });
 
         return true;
     }
 
         return true;
     }