Bug 8385: Fix testMultipleRegistrationsAtOnePrefix failure 22/56722/4
authorTom Pantelis <tompantelis@gmail.com>
Tue, 9 May 2017 12:29:07 +0000 (08:29 -0400)
committerRobert Varga <nite@hq.sk>
Wed, 10 May 2017 09:18:45 +0000 (09:18 +0000)
The previous patch added a callback on the Future returned by
gracefulStop on shard removal. The timout was set to 3 * election timeout
which is 30 s in production by default. For the tests the election
timeout is 500 ms so the timeout is 1500 ms. However, if the timing is right,
the leader may not be able to transfer leadership on shutdown if the other
member was already shutdown. On shutdown there's a 2 sec wait to hear from
a new leader - this is greater than the 1500 ms shutdown timeout which
leads to test failure. To alleviate this, I made 10 s the minimum for the
shutdown timeout.

Another problem was that, after the stop future failed, the OnComplete
callback for PrefixShardCreated was repeated many times before the
OnComplete callback queued the message to remove the Future from the map.
To alleviate this, I added a CompositeOnComplete containing a list of
deferred OnComplete tasks. This allows the control to remove the entry
from the map before the deferred tasks run.

Change-Id: I899518e6d7e92533d2c4008a978ac772b02863cf
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/CompositeOnComplete.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java

index d3d8ce39c9a108f0b497b942f9e6e5368f3c6a21..285dbd71f169bb9ec93f82d3c3aa4a9afc35f23d 100644 (file)
@@ -85,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -167,7 +168,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
-    private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
+    private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
 
     private final String persistenceId;
     private final AbstractDataStore dataStore;
 
     private final String persistenceId;
     private final AbstractDataStore dataStore;
@@ -490,24 +491,34 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         final ActorRef shardActor = shardInformation.getActor();
         if (shardActor != null) {
 
         final ActorRef shardActor = shardInformation.getActor();
         if (shardActor != null) {
-            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
-            FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
-                    .getElectionTimeOutInterval().$times(3);
-            final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
-            shardActorStoppingFutures.put(shardName, stopFuture);
-            stopFuture.onComplete(new OnComplete<Boolean>() {
+            long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
+                    .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+
+            LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
+                    timeoutInMS);
+
+            final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
+                    FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
+
+            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
                 @Override
                 @Override
-                public void onComplete(Throwable failure, Boolean result) {
+                public void onComplete(final Throwable failure, final Boolean result) {
                     if (failure == null) {
                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
                     } else {
                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
                     }
 
                     if (failure == null) {
                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
                     } else {
                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
                     }
 
-                    self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
-                            ActorRef.noSender());
+                    self().tell((RunnableMessage) () -> {
+                        shardActorsStopping.remove(shardName);
+                        notifyOnCompleteTasks(failure, result);
+                    }, ActorRef.noSender());
                 }
                 }
-            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+            };
+
+            shardActorsStopping.put(shardName, onComplete);
+            stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
+                    .getDispatcher(Dispatchers.DispatcherType.Client));
         }
 
         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
         }
 
         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
@@ -594,21 +605,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
     }
 
     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
-        final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
-        if (stopFuture == null) {
+        final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
+        if (stopOnComplete == null) {
             return false;
         }
 
             return false;
         }
 
-        LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
+        LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
                 shardName, messageToDefer);
         final ActorRef sender = getSender();
                 shardName, messageToDefer);
         final ActorRef sender = getSender();
-        stopFuture.onComplete(new OnComplete<Boolean>() {
+        stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
             @Override
             public void onComplete(Throwable failure, Boolean result) {
                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
                 self().tell(messageToDefer, sender);
             }
             @Override
             public void onComplete(Throwable failure, Boolean result) {
                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
                 self().tell(messageToDefer, sender);
             }
-        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        });
 
         return true;
     }
 
         return true;
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/CompositeOnComplete.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/CompositeOnComplete.java
new file mode 100644 (file)
index 0000000..0ae3fef
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.OnComplete;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An OnComplete implementation that aggrgates other OnComplete tasks.
+ *
+ * @author Thomas Pantelis
+ *
+ * @param <T> the result type
+ */
+public abstract class CompositeOnComplete<T> extends OnComplete<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(CompositeOnComplete.class);
+
+    private final List<OnComplete<T>> onCompleteTasks = new ArrayList<>();
+
+    public void addOnComplete(OnComplete<T> task) {
+        onCompleteTasks.add(task);
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    protected void notifyOnCompleteTasks(Throwable failure, T result) {
+        for (OnComplete<T> task: onCompleteTasks) {
+            try {
+                task.onComplete(failure, result);
+            } catch (Throwable e) {
+                LOG.error("Caught unexpected exception", e);
+            }
+        }
+
+        onCompleteTasks.clear();
+    }
+}
index ca8af73cb0d8087b7722652d8c3f800fa562cc57..af07d62d9cf44ec713ba3bf9bbca6d6cb4dfbc9f 100644 (file)
@@ -30,7 +30,6 @@ import java.util.Set;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
@@ -383,13 +382,12 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
     }
 
     @Test
     }
 
     @Test
-    @Ignore
     public void testMultipleRegistrationsAtOnePrefix() throws Exception {
         LOG.info("testMultipleRegistrationsAtOnePrefix starting");
         initEmptyDatastores();
 
     public void testMultipleRegistrationsAtOnePrefix() throws Exception {
         LOG.info("testMultipleRegistrationsAtOnePrefix starting");
         initEmptyDatastores();
 
-        for (int i = 0; i < 10; i++) {
-            LOG.debug("Round {}", i);
+        for (int i = 0; i < 5; i++) {
+            LOG.info("Round {}", i);
             final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
                     TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
                     DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
             final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
                     TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
                     DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);