X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=7476ee2802706818b0237836625266ffcfa2efcd;hb=b124e8216055ee30a87207c8b8a95e5c9661f291;hp=4e45dc4f212f37d2de0f36b9ba2720c5bb004c42;hpb=ffe27dc93dc6f6a190287164f10444f4f6838d59;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 4e45dc4f21..7476ee2802 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -242,7 +242,7 @@ public class ShardManagerTest extends AbstractActorTest { return shardManager; } - private void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) { + private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) { AssertionError last = null; Stopwatch sw = Stopwatch.createStarted(); while(sw.elapsed(TimeUnit.SECONDS) <= 5) { @@ -260,7 +260,7 @@ public class ShardManagerTest extends AbstractActorTest { throw last; } - private T expectMsgClassOrFailure(Class msgClass, JavaTestKit kit, String msg) { + private static T expectMsgClassOrFailure(Class msgClass, JavaTestKit kit, String msg) { Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class); if(reply instanceof Failure) { throw new AssertionError(msg + " failed", ((Failure)reply).cause()); @@ -522,6 +522,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting"); + datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -645,7 +646,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager1 = TestActorRef.create(system1, newTestShardMgrBuilderWithMockShardActor().cluster( - new ClusterWrapperImpl(system1)).props(), shardManagerID); + new ClusterWrapperImpl(system1)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. @@ -661,7 +663,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager2 = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( - new ClusterWrapperImpl(system2)).props(), shardManagerID); + new ClusterWrapperImpl(system2)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); new JavaTestKit(system1) {{ @@ -714,7 +717,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager1 = TestActorRef.create(system1, newTestShardMgrBuilder().shardActor(mockShardActor1).cluster( - new ClusterWrapperImpl(system1)).props(), shardManagerID); + new ClusterWrapperImpl(system1)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. @@ -729,7 +733,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager2 = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( - new ClusterWrapperImpl(system2)).props(), shardManagerID); + new ClusterWrapperImpl(system2)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); new JavaTestKit(system1) {{ @@ -757,8 +762,8 @@ public class ShardManagerTest extends AbstractActorTest { String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); @@ -766,8 +771,8 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("getMemberName", "member-2", peerDown.getMemberName()); MessageCollectorActor.clearMessages(mockShardActor1); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); @@ -775,8 +780,8 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForReachableMember(); @@ -790,21 +795,21 @@ public class ShardManagerTest extends AbstractActorTest { String path1 = found1.getPrimaryPath(); assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); // Test FindPrimary wait succeeds after reachable member event. - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); shardManager1.tell(new FindPrimary("default", true), getRef()); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); String path2 = found2.getPrimaryPath(); @@ -829,8 +834,8 @@ public class ShardManagerTest extends AbstractActorTest { final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); final TestActorRef shardManager1 = TestActorRef.create(system1, newTestShardMgrBuilder().shardActor(mockShardActor1).cluster( - new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(), - shardManagerID); + new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. @@ -845,7 +850,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager2 = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( - new ClusterWrapperImpl(system2)).props(), shardManagerID); + new ClusterWrapperImpl(system2)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); new JavaTestKit(system1) {{ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -875,8 +881,8 @@ public class ShardManagerTest extends AbstractActorTest { primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection( mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.absent())); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); @@ -1352,6 +1358,8 @@ public class ShardManagerTest extends AbstractActorTest { public void testRestoreFromSnapshot() throws Throwable { LOG.info("testRestoreFromSnapshot starting"); + datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS); + JavaTestKit kit = new JavaTestKit(getSystem()); MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). @@ -1405,6 +1413,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplica() throws Exception { + LOG.info("testAddShardReplica starting"); MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("default", Arrays.asList("member-1", "member-2")). @@ -1419,7 +1428,7 @@ public class ShardManagerTest extends AbstractActorTest { ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); final TestActorRef newReplicaShardManager = TestActorRef.create(system1, newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster( - new ClusterWrapperImpl(system1)).props(), shardManagerID); + new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. final ActorSystem system2 = newActorSystem("Member2"); @@ -1427,10 +1436,12 @@ public class ShardManagerTest extends AbstractActorTest { String name = new ShardIdentifier("astronauts", "member-2", "config").toString(); final TestActorRef mockShardLeaderActor = - TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + TestActorRef.create(system2, Props.create(MockRespondActor.class). + withDispatcher(Dispatchers.DefaultDispatcherId()), name); final TestActorRef leaderShardManager = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster( - new ClusterWrapperImpl(system2)).props(), shardManagerID); + new ClusterWrapperImpl(system2)).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); new JavaTestKit(system1) {{ @@ -1477,7 +1488,6 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"), Sets.newHashSet(shardManagerSnapshot.getShardList())); }}; - LOG.info("testAddShardReplica ending"); } @@ -1614,6 +1624,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting"); + datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("astronauts", Arrays.asList("member-2")).build()); @@ -1622,7 +1633,8 @@ public class ShardManagerTest extends AbstractActorTest { shardActor(mockShardActor).props(), shardMgrID); newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString()); + MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", + AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString()); newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); @@ -1691,7 +1703,7 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef newReplicaShardManager = TestActorRef.create(system1, newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster( - new ClusterWrapperImpl(system1)).props(), + new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. @@ -1706,7 +1718,7 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef leaderShardManager = TestActorRef.create(system2, newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster( - new ClusterWrapperImpl(system2)).props(), + new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so, @@ -1777,7 +1789,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange, - final Class firstForwardedServerChangeClass, + final Class firstForwardedServerChangeClass, final Object secondServerChange) throws Exception { new JavaTestKit(getSystem()) {{ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); @@ -1789,7 +1801,7 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager = TestActorRef.create(getSystem(), newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster( - new MockClusterWrapper()).props(), + new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID); shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); @@ -1922,22 +1934,23 @@ public class ShardManagerTest extends AbstractActorTest { String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1"). type(shardMrgIDSuffix).build().toString(); TestActorRef shard1 = actorFactory.createTestActor( - MessageCollectorActor.props(), shardId1); + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1); String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1"). type(shardMrgIDSuffix).build().toString(); TestActorRef shard2 = actorFactory.createTestActor( - MessageCollectorActor.props(), shardId2); + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2); TestActorRef shardManager = actorFactory.createTestActor(newTestShardMgrBuilder( - mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props()); + mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props(). + withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), shard1); shardManager.tell(new ActorInitialized(), shard2); FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - Future stopFuture = Patterns.gracefulStop(shardManager, duration, new Shutdown()); + Future stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE); MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class); MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class); @@ -2001,22 +2014,22 @@ public class ShardManagerTest extends AbstractActorTest { if(message instanceof FindPrimary) { findPrimaryMessageReceived.countDown(); } else if(message instanceof ClusterEvent.MemberUp) { - String role = ((ClusterEvent.MemberUp)message).member().roles().head(); + String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next(); if(!getCluster().getCurrentMemberName().equals(role)) { memberUpReceived.countDown(); } } else if(message instanceof ClusterEvent.MemberRemoved) { - String role = ((ClusterEvent.MemberRemoved)message).member().roles().head(); + String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next(); if(!getCluster().getCurrentMemberName().equals(role)) { memberRemovedReceived.countDown(); } } else if(message instanceof ClusterEvent.UnreachableMember) { - String role = ((ClusterEvent.UnreachableMember)message).member().roles().head(); + String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next(); if(!getCluster().getCurrentMemberName().equals(role)) { memberUnreachableReceived.countDown(); } } else if(message instanceof ClusterEvent.ReachableMember) { - String role = ((ClusterEvent.ReachableMember)message).member().roles().head(); + String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next(); if(!getCluster().getCurrentMemberName().equals(role)) { memberReachableReceived.countDown(); } @@ -2156,7 +2169,7 @@ public class ShardManagerTest extends AbstractActorTest { boolean canIntercept(Object message); } - private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { + private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { return new MessageInterceptor(){ @Override public Object apply(Object message) {