Fix sporadic ShardManagerTest failures 28/34928/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 17 Feb 2016 18:55:20 +0000 (13:55 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 19 Feb 2016 14:31:27 +0000 (14:31 +0000)
Some of the tests fail sporadically. Most were alleviated by:

  - using tell on an actor rather than calling receiveCommand directly
  - using the normal fork/join dispatcher for creating TestActors instead
    of the default CallingThread dispatcher.

After the changes the tests ran over 200 times successfully.

Change-Id: Ib2c7c3b6dace9e89dff54eccc58a2b8aabad75de
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index 9c095745f3bad475d5c97b4ddc988b5ec4cd8607..8ef3f6f9fea1316dee1039282b61b0723e8b16c9 100644 (file)
@@ -933,8 +933,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 continue;
             }
 
-            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
-                    shardName, address);
+            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
+                    persistenceId(), shardName, address, visitedAddresses);
 
             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
                     message.isWaitUntilReady(), visitedAddresses), getContext());
index 3a463980fc11723fe4d0c278fb8c78f351dea2f3..7476ee2802706818b0237836625266ffcfa2efcd 100644 (file)
@@ -522,6 +522,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
+        datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
         new JavaTestKit(getSystem()) {{
             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
@@ -645,7 +646,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
                 newTestShardMgrBuilderWithMockShardActor().cluster(
-                        new ClusterWrapperImpl(system1)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system1)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
 
@@ -661,7 +663,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
-                        new ClusterWrapperImpl(system2)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system2)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         new JavaTestKit(system1) {{
 
@@ -714,7 +717,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
-                        new ClusterWrapperImpl(system1)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system1)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
 
@@ -729,7 +733,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
-                        new ClusterWrapperImpl(system2)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system2)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         new JavaTestKit(system1) {{
 
@@ -757,8 +762,8 @@ public class ShardManagerTest extends AbstractActorTest {
             String path = found.getPrimaryPath();
             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             shardManager1.underlyingActor().waitForUnreachableMember();
 
@@ -766,8 +771,8 @@ public class ShardManagerTest extends AbstractActorTest {
             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
             MessageCollectorActor.clearMessages(mockShardActor1);
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                    createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                    createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
 
@@ -775,8 +780,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             shardManager1.underlyingActor().waitForReachableMember();
 
@@ -790,21 +795,21 @@ public class ShardManagerTest extends AbstractActorTest {
             String path1 = found1.getPrimaryPath();
             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                    createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                    createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
 
             // Test FindPrimary wait succeeds after reachable member event.
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                    createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                    createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
             shardManager1.underlyingActor().waitForUnreachableMember();
 
             shardManager1.tell(new FindPrimary("default", true), getRef());
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                    createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                    createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             String path2 = found2.getPrimaryPath();
@@ -829,8 +834,8 @@ public class ShardManagerTest extends AbstractActorTest {
         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
-                        new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(),
-                shardManagerID);
+                        new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props().
+                            withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
 
@@ -845,7 +850,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
-                        new ClusterWrapperImpl(system2)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system2)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         new JavaTestKit(system1) {{
             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@@ -875,8 +881,8 @@ public class ShardManagerTest extends AbstractActorTest {
             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             shardManager1.underlyingActor().waitForUnreachableMember();
 
@@ -1352,6 +1358,8 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRestoreFromSnapshot() throws Throwable {
         LOG.info("testRestoreFromSnapshot starting");
 
+        datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
+
         JavaTestKit kit = new JavaTestKit(getSystem());
 
         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
@@ -1405,6 +1413,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testAddShardReplica() throws Exception {
+        LOG.info("testAddShardReplica starting");
         MockConfiguration mockConfig =
                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
                    put("default", Arrays.asList("member-1", "member-2")).
@@ -1419,7 +1428,7 @@ public class ShardManagerTest extends AbstractActorTest {
         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
-                        new ClusterWrapperImpl(system1)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
         final ActorSystem system2 = newActorSystem("Member2");
@@ -1427,10 +1436,12 @@ public class ShardManagerTest extends AbstractActorTest {
 
         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
         final TestActorRef<MockRespondActor> mockShardLeaderActor =
-                TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+                TestActorRef.create(system2, Props.create(MockRespondActor.class).
+                        withDispatcher(Dispatchers.DefaultDispatcherId()), name);
         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
-                        new ClusterWrapperImpl(system2)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system2)).props().
+                            withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         new JavaTestKit(system1) {{
 
@@ -1477,7 +1488,6 @@ public class ShardManagerTest extends AbstractActorTest {
             assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
                     Sets.newHashSet(shardManagerSnapshot.getShardList()));
         }};
-
         LOG.info("testAddShardReplica ending");
     }
 
@@ -1614,6 +1624,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
+        datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
         new JavaTestKit(getSystem()) {{
             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
                        put("astronauts", Arrays.asList("member-2")).build());
@@ -1622,7 +1633,8 @@ public class ShardManagerTest extends AbstractActorTest {
                     shardActor(mockShardActor).props(), shardMgrID);
 
             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-            MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
+            MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
+                    AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
 
             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
@@ -1691,7 +1703,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
-                        new ClusterWrapperImpl(system1)).props(),
+                        new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
@@ -1706,7 +1718,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
-                        new ClusterWrapperImpl(system2)).props(),
+                        new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 shardManagerID);
 
         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
@@ -1789,7 +1801,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
                     newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
-                            new MockClusterWrapper()).props(),
+                            new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     shardMgrID);
 
             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
@@ -1922,15 +1934,16 @@ public class ShardManagerTest extends AbstractActorTest {
             String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
                     type(shardMrgIDSuffix).build().toString();
             TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
-                    MessageCollectorActor.props(), shardId1);
+                    MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
 
             String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
                     type(shardMrgIDSuffix).build().toString();
             TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
-                    MessageCollectorActor.props(), shardId2);
+                    MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
 
             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
-                    mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
+                    mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props().
+                        withDispatcher(Dispatchers.DefaultDispatcherId()));
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), shard1);