Remove clientTxVersion from ShardTransaction
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
index 4e45dc4f212f37d2de0f36b9ba2720c5bb004c42..7476ee2802706818b0237836625266ffcfa2efcd 100644 (file)
@@ -242,7 +242,7 @@ public class ShardManagerTest extends AbstractActorTest {
         return shardManager;
     }
 
-    private void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
+    private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
         AssertionError last = null;
         Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
@@ -260,7 +260,7 @@ public class ShardManagerTest extends AbstractActorTest {
         throw last;
     }
 
-    private <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
+    private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
         if(reply instanceof Failure) {
             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
@@ -522,6 +522,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
+        datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
         new JavaTestKit(getSystem()) {{
             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
@@ -645,7 +646,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
                 newTestShardMgrBuilderWithMockShardActor().cluster(
-                        new ClusterWrapperImpl(system1)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system1)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
 
@@ -661,7 +663,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
-                        new ClusterWrapperImpl(system2)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system2)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         new JavaTestKit(system1) {{
 
@@ -714,7 +717,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
-                        new ClusterWrapperImpl(system1)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system1)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
 
@@ -729,7 +733,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
-                        new ClusterWrapperImpl(system2)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system2)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         new JavaTestKit(system1) {{
 
@@ -757,8 +762,8 @@ public class ShardManagerTest extends AbstractActorTest {
             String path = found.getPrimaryPath();
             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             shardManager1.underlyingActor().waitForUnreachableMember();
 
@@ -766,8 +771,8 @@ public class ShardManagerTest extends AbstractActorTest {
             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
             MessageCollectorActor.clearMessages(mockShardActor1);
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                    createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                    createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
 
@@ -775,8 +780,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             shardManager1.underlyingActor().waitForReachableMember();
 
@@ -790,21 +795,21 @@ public class ShardManagerTest extends AbstractActorTest {
             String path1 = found1.getPrimaryPath();
             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                    createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                    createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
 
             // Test FindPrimary wait succeeds after reachable member event.
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                    createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                    createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
             shardManager1.underlyingActor().waitForUnreachableMember();
 
             shardManager1.tell(new FindPrimary("default", true), getRef());
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                    createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                    createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             String path2 = found2.getPrimaryPath();
@@ -829,8 +834,8 @@ public class ShardManagerTest extends AbstractActorTest {
         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
-                        new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(),
-                shardManagerID);
+                        new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props().
+                            withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
 
@@ -845,7 +850,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
-                        new ClusterWrapperImpl(system2)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system2)).props().withDispatcher(
+                                Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         new JavaTestKit(system1) {{
             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@@ -875,8 +881,8 @@ public class ShardManagerTest extends AbstractActorTest {
             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
 
-            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
-                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+            shardManager1.tell(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
 
             shardManager1.underlyingActor().waitForUnreachableMember();
 
@@ -1352,6 +1358,8 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRestoreFromSnapshot() throws Throwable {
         LOG.info("testRestoreFromSnapshot starting");
 
+        datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
+
         JavaTestKit kit = new JavaTestKit(getSystem());
 
         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
@@ -1405,6 +1413,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testAddShardReplica() throws Exception {
+        LOG.info("testAddShardReplica starting");
         MockConfiguration mockConfig =
                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
                    put("default", Arrays.asList("member-1", "member-2")).
@@ -1419,7 +1428,7 @@ public class ShardManagerTest extends AbstractActorTest {
         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
-                        new ClusterWrapperImpl(system1)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
         final ActorSystem system2 = newActorSystem("Member2");
@@ -1427,10 +1436,12 @@ public class ShardManagerTest extends AbstractActorTest {
 
         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
         final TestActorRef<MockRespondActor> mockShardLeaderActor =
-                TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+                TestActorRef.create(system2, Props.create(MockRespondActor.class).
+                        withDispatcher(Dispatchers.DefaultDispatcherId()), name);
         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
-                        new ClusterWrapperImpl(system2)).props(), shardManagerID);
+                        new ClusterWrapperImpl(system2)).props().
+                            withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
 
         new JavaTestKit(system1) {{
 
@@ -1477,7 +1488,6 @@ public class ShardManagerTest extends AbstractActorTest {
             assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
                     Sets.newHashSet(shardManagerSnapshot.getShardList()));
         }};
-
         LOG.info("testAddShardReplica ending");
     }
 
@@ -1614,6 +1624,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
+        datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
         new JavaTestKit(getSystem()) {{
             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
                        put("astronauts", Arrays.asList("member-2")).build());
@@ -1622,7 +1633,8 @@ public class ShardManagerTest extends AbstractActorTest {
                     shardActor(mockShardActor).props(), shardMgrID);
 
             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-            MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
+            MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
+                    AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
 
             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
@@ -1691,7 +1703,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
-                        new ClusterWrapperImpl(system1)).props(),
+                        new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-2.
@@ -1706,7 +1718,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
-                        new ClusterWrapperImpl(system2)).props(),
+                        new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 shardManagerID);
 
         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
@@ -1777,7 +1789,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
 
     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
-                                                      final Class firstForwardedServerChangeClass,
+                                                      final Class<?> firstForwardedServerChangeClass,
                                                       final Object secondServerChange) throws Exception {
         new JavaTestKit(getSystem()) {{
             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
@@ -1789,7 +1801,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
                     newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
-                            new MockClusterWrapper()).props(),
+                            new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     shardMgrID);
 
             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
@@ -1922,22 +1934,23 @@ public class ShardManagerTest extends AbstractActorTest {
             String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
                     type(shardMrgIDSuffix).build().toString();
             TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
-                    MessageCollectorActor.props(), shardId1);
+                    MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
 
             String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
                     type(shardMrgIDSuffix).build().toString();
             TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
-                    MessageCollectorActor.props(), shardId2);
+                    MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
 
             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
-                    mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
+                    mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props().
+                        withDispatcher(Dispatchers.DefaultDispatcherId()));
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), shard1);
             shardManager.tell(new ActorInitialized(), shard2);
 
             FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
-            Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, new Shutdown());
+            Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
 
             MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
             MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
@@ -2001,22 +2014,22 @@ public class ShardManagerTest extends AbstractActorTest {
                 if(message instanceof FindPrimary) {
                     findPrimaryMessageReceived.countDown();
                 } else if(message instanceof ClusterEvent.MemberUp) {
-                    String role = ((ClusterEvent.MemberUp)message).member().roles().head();
+                    String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next();
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberUpReceived.countDown();
                     }
                 } else if(message instanceof ClusterEvent.MemberRemoved) {
-                    String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
+                    String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next();
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberRemovedReceived.countDown();
                     }
                 } else if(message instanceof ClusterEvent.UnreachableMember) {
-                    String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
+                    String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next();
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberUnreachableReceived.countDown();
                     }
                 } else if(message instanceof ClusterEvent.ReachableMember) {
-                    String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
+                    String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next();
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberReachableReceived.countDown();
                     }
@@ -2156,7 +2169,7 @@ public class ShardManagerTest extends AbstractActorTest {
         boolean canIntercept(Object message);
     }
 
-    private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
+    private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
         return new MessageInterceptor(){
             @Override
             public Object apply(Object message) {