X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=bb80b2c273d859051a5db1b959bbf63830fa80b2;hb=15c366198fa48eefd94f4d1a72faa9833e988250;hp=cef38b9c1b176268ce184156fae51fc14690fb9e;hpb=88f763ec4ec2bcc1e0fd414ccb2f105f7490b8e9;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index cef38b9c1b..bb80b2c273 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -24,7 +25,6 @@ import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; import akka.actor.Status.Success; -import akka.actor.Terminated; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.dispatch.Dispatchers; @@ -93,6 +93,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -104,8 +105,11 @@ import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.RemoveServer; +import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -210,13 +214,23 @@ public class ShardManagerTest extends AbstractActorTest { } private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() { - return TestShardManager.builder(datastoreContextBuilder).shardActor(mockShardActor); + return newTestShardMgrBuilderWithMockShardActor(mockShardActor); } + private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) { + return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor); + } + + private Props newPropsShardMgrWithMockShardActor() { return newTestShardMgrBuilderWithMockShardActor().props(); } + private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) { + return newTestShardMgrBuilderWithMockShardActor(shardActor).props(); + } + + private TestShardManager newTestShardManager() { return newTestShardManager(newShardMgrProps()); } @@ -228,7 +242,7 @@ public class ShardManagerTest extends AbstractActorTest { return shardManager; } - private void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) { + private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) { AssertionError last = null; Stopwatch sw = Stopwatch.createStarted(); while(sw.elapsed(TimeUnit.SECONDS) <= 5) { @@ -246,7 +260,7 @@ public class ShardManagerTest extends AbstractActorTest { throw last; } - private T expectMsgClassOrFailure(Class msgClass, JavaTestKit kit, String msg) { + private static T expectMsgClassOrFailure(Class msgClass, JavaTestKit kit, String msg) { Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class); if(reply instanceof Failure) { throw new AssertionError(msg + " failed", ((Failure)reply).cause()); @@ -508,6 +522,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting"); + datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -631,7 +646,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager1 = TestActorRef.create(system1, newTestShardMgrBuilderWithMockShardActor().cluster( - new ClusterWrapperImpl(system1)).props(), shardManagerID); + new ClusterWrapperImpl(system1)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. @@ -647,7 +663,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager2 = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( - new ClusterWrapperImpl(system2)).props(), shardManagerID); + new ClusterWrapperImpl(system2)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); new JavaTestKit(system1) {{ @@ -700,7 +717,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager1 = TestActorRef.create(system1, newTestShardMgrBuilder().shardActor(mockShardActor1).cluster( - new ClusterWrapperImpl(system1)).props(), shardManagerID); + new ClusterWrapperImpl(system1)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. @@ -715,7 +733,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager2 = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( - new ClusterWrapperImpl(system2)).props(), shardManagerID); + new ClusterWrapperImpl(system2)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); new JavaTestKit(system1) {{ @@ -743,8 +762,8 @@ public class ShardManagerTest extends AbstractActorTest { String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); @@ -752,8 +771,8 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("getMemberName", "member-2", peerDown.getMemberName()); MessageCollectorActor.clearMessages(mockShardActor1); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); @@ -761,8 +780,8 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForReachableMember(); @@ -776,11 +795,25 @@ public class ShardManagerTest extends AbstractActorTest { String path1 = found1.getPrimaryPath(); assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + // Test FindPrimary wait succeeds after reachable member event. + + shardManager1.tell(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); + shardManager1.underlyingActor().waitForUnreachableMember(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + shardManager1.tell(MockClusterWrapper. + createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); + + RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path2 = found2.getPrimaryPath(); + assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config")); }}; LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending"); @@ -801,8 +834,8 @@ public class ShardManagerTest extends AbstractActorTest { final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); final TestActorRef shardManager1 = TestActorRef.create(system1, newTestShardMgrBuilder().shardActor(mockShardActor1).cluster( - new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(), - shardManagerID); + new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. @@ -817,7 +850,8 @@ public class ShardManagerTest extends AbstractActorTest { final TestActorRef shardManager2 = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( - new ClusterWrapperImpl(system2)).props(), shardManagerID); + new ClusterWrapperImpl(system2)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); new JavaTestKit(system1) {{ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -847,8 +881,8 @@ public class ShardManagerTest extends AbstractActorTest { primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection( mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.absent())); - shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. - createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + shardManager1.tell(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); @@ -1324,6 +1358,8 @@ public class ShardManagerTest extends AbstractActorTest { public void testRestoreFromSnapshot() throws Throwable { LOG.info("testRestoreFromSnapshot starting"); + datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS); + JavaTestKit kit = new JavaTestKit(getSystem()); MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). @@ -1377,6 +1413,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplica() throws Exception { + LOG.info("testAddShardReplica starting"); MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("default", Arrays.asList("member-1", "member-2")). @@ -1391,7 +1428,7 @@ public class ShardManagerTest extends AbstractActorTest { ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); final TestActorRef newReplicaShardManager = TestActorRef.create(system1, newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster( - new ClusterWrapperImpl(system1)).props(), shardManagerID); + new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. final ActorSystem system2 = newActorSystem("Member2"); @@ -1399,10 +1436,12 @@ public class ShardManagerTest extends AbstractActorTest { String name = new ShardIdentifier("astronauts", "member-2", "config").toString(); final TestActorRef mockShardLeaderActor = - TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + TestActorRef.create(system2, Props.create(MockRespondActor.class). + withDispatcher(Dispatchers.DefaultDispatcherId()), name); final TestActorRef leaderShardManager = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster( - new ClusterWrapperImpl(system2)).props(), shardManagerID); + new ClusterWrapperImpl(system2)).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); new JavaTestKit(system1) {{ @@ -1449,7 +1488,6 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"), Sets.newHashSet(shardManagerSnapshot.getShardList())); }}; - LOG.info("testAddShardReplica ending"); } @@ -1579,36 +1617,14 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplicaWithAlreadyInProgress() throws Exception { - LOG.info("testAddShardReplicaWithAlreadyInProgress starting"); - new JavaTestKit(getSystem()) {{ - JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); - JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); - - MockConfiguration mockConfig = - new MockConfiguration(ImmutableMap.>builder(). - put("astronauts", Arrays.asList("member-2")).build()); - - final TestActorRef shardManager = actorFactory.createTestActor( - newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID); - shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); - - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - - shardManager.tell(new AddShardReplica("astronauts"), getRef()); - - mockShardLeaderKit.expectMsgClass(AddServer.class); - - shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef()); - - secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); - }}; - - LOG.info("testAddShardReplicaWithAlreadyInProgress ending"); + testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"), + AddServer.class, new AddShardReplica("astronauts")); } @Test public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting"); + datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("astronauts", Arrays.asList("member-2")).build()); @@ -1617,7 +1633,8 @@ public class ShardManagerTest extends AbstractActorTest { shardActor(mockShardActor).props(), shardMgrID); newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString()); + MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", + AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString()); newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); @@ -1634,12 +1651,171 @@ public class ShardManagerTest extends AbstractActorTest { ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); - shardManager.tell(new RemoveShardReplica("model-inventory"), getRef()); - Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); + shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef()); + Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class); assertEquals("Failure obtained", true, - (resp.cause() instanceof IllegalArgumentException)); + (resp.cause() instanceof PrimaryNotFoundException)); }}; + } + @Test + /** + * Primary is Local + */ + public void testRemoveShardReplicaLocal() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final TestActorRef respondActor = + TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), getRef()); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), respondActor); + + respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null)); + shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef()); + final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class); + assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(), + removeServer.getServerId()); + expectMsgClass(duration("5 seconds"), Success.class); + }}; + } + + @Test + public void testRemoveShardReplicaRemote() throws Exception { + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-1")).build()); + + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = newActorSystem("Member1"); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster( + new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + final ActorSystem system2 = newActorSystem("Member2"); + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString(); + final TestActorRef mockShardLeaderActor = + TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + + LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor); + + final TestActorRef leaderShardManager = TestActorRef.create(system2, + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster( + new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + shardManagerID); + + // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so, + // akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1 + // However when a shard manager has a local shard which is a follower and a leader that is remote it will + // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will + // look like so, + // akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1 + // In this specific case if we did a FindPrimary for shard default from member-1 we would come up + // with the address of an actor which does not exist, therefore any message sent to that actor would go to + // dead letters. + // To work around this problem we create a ForwardingActor with the right address and pass to it the + // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every + // thing works as expected + final ActorRef actorRef = leaderShardManager.underlyingActor().context() + .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix); + + LOG.error("Forwarding actor : {}", actorRef); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor); + leaderShardManager.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor); + + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor); + newReplicaShardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + + newReplicaShardManager.underlyingActor().waitForMemberUp(); + leaderShardManager.underlyingActor().waitForMemberUp(); + + //construct a mock response message + RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2); + mockShardLeaderActor.underlyingActor().updateResponse(response); + newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef()); + RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, + RemoveServer.class); + String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(); + assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId()); + expectMsgClass(duration("5 seconds"), Status.Success.class); + }}; + + } + + @Test + public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception { + testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"), + RemoveServer.class, new RemoveShardReplica("astronauts", "member-3")); + } + + @Test + public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception { + testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"), + AddServer.class, new RemoveShardReplica("astronauts", "member-2")); + } + + + public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange, + final Class firstForwardedServerChangeClass, + final Object secondServerChange) throws Exception { + new JavaTestKit(getSystem()) {{ + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put(shardName, Arrays.asList("member-2")).build()); + + final TestActorRef shardManager = TestActorRef.create(getSystem(), + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster( + new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + shardMgrID); + + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(firstServerChange, getRef()); + + mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass); + + shardManager.tell(secondServerChange, secondRequestKit.getRef()); + + secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); + }}; } @Test @@ -1690,8 +1866,6 @@ public class ShardManagerTest extends AbstractActorTest { TestActorRef shardManager = actorFactory.createTestActor( newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()); - watch(shard); - shardManager.underlyingActor().waitForRecoveryComplete(); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -1705,7 +1879,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); - expectMsgClass(duration("5 seconds"), Terminated.class); + MessageCollectorActor.expectFirstMatching(shard, Shutdown.class); }}; LOG.info("testServerRemovedShardActorRunning ending"); @@ -1748,6 +1922,55 @@ public class ShardManagerTest extends AbstractActorTest { LOG.info("testShardPersistenceWithRestoredData ending"); } + @Test + public void testShutDown() throws Exception { + LOG.info("testShutDown starting"); + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("shard1", Arrays.asList("member-1")). + put("shard2", Arrays.asList("member-1")).build()); + + String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1"). + type(shardMrgIDSuffix).build().toString(); + TestActorRef shard1 = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1); + + String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1"). + type(shardMrgIDSuffix).build().toString(); + TestActorRef shard2 = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2); + + TestActorRef shardManager = actorFactory.createTestActor(newTestShardMgrBuilder( + mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props(). + withDispatcher(Dispatchers.DefaultDispatcherId())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), shard1); + shardManager.tell(new ActorInitialized(), shard2); + + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE); + + MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class); + MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class); + + try { + Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS)); + fail("ShardManager actor stopped without waiting for the Shards to be stopped"); + } catch(TimeoutException e) { + // expected + } + + actorFactory.killActor(shard1, this); + actorFactory.killActor(shard2, this); + + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); + }}; + + LOG.info("testShutDown ending"); + } private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); @@ -1769,7 +1992,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Override - public void handleRecover(Object message) throws Exception { + protected void handleRecover(Object message) throws Exception { try { super.handleRecover(message); } finally { @@ -1791,22 +2014,22 @@ public class ShardManagerTest extends AbstractActorTest { if(message instanceof FindPrimary) { findPrimaryMessageReceived.countDown(); } else if(message instanceof ClusterEvent.MemberUp) { - String role = ((ClusterEvent.MemberUp)message).member().roles().head(); + String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next(); if(!getCluster().getCurrentMemberName().equals(role)) { memberUpReceived.countDown(); } } else if(message instanceof ClusterEvent.MemberRemoved) { - String role = ((ClusterEvent.MemberRemoved)message).member().roles().head(); + String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next(); if(!getCluster().getCurrentMemberName().equals(role)) { memberRemovedReceived.countDown(); } } else if(message instanceof ClusterEvent.UnreachableMember) { - String role = ((ClusterEvent.UnreachableMember)message).member().roles().head(); + String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next(); if(!getCluster().getCurrentMemberName().equals(role)) { memberUnreachableReceived.countDown(); } } else if(message instanceof ClusterEvent.ReachableMember) { - String role = ((ClusterEvent.ReachableMember)message).member().roles().head(); + String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next(); if(!getCluster().getCurrentMemberName().equals(role)) { memberReachableReceived.countDown(); } @@ -1946,7 +2169,7 @@ public class ShardManagerTest extends AbstractActorTest { boolean canIntercept(Object message); } - private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { + private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { return new MessageInterceptor(){ @Override public Object apply(Object message) { @@ -1962,6 +2185,7 @@ public class ShardManagerTest extends AbstractActorTest { private static class MockRespondActor extends MessageCollectorActor { static final String CLEAR_RESPONSE = "clear-response"; + static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class); private volatile Object responseMsg; @@ -1980,12 +2204,15 @@ public class ShardManagerTest extends AbstractActorTest { @Override public void onReceive(Object message) throws Exception { + if(!"get-all-messages".equals(message)) { + LOG.debug("Received message : {}", message); + } super.onReceive(message); - if (message instanceof AddServer) { - if (responseMsg != null) { - getSender().tell(responseMsg, getSelf()); - } - } if(message.equals(CLEAR_RESPONSE)) { + if (message instanceof AddServer && responseMsg != null) { + getSender().tell(responseMsg, getSelf()); + } else if(message instanceof RemoveServer && responseMsg != null){ + getSender().tell(responseMsg, getSelf()); + } else if(message.equals(CLEAR_RESPONSE)) { responseMsg = null; } }