X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=4a6754bbb3b2cb665bc328d0938007365ced577c;hb=186c5d82335ed7d8c39472355f7b1c1e084c26cd;hp=77250896d910d50fd0f150b089dfd2c8c69f8428;hpb=e9cf78d1c39bbad20b8c8fee330b4a010ef14318;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 77250896d9..4a6754bbb3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -83,6 +83,9 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -1060,16 +1063,96 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplica() throws Exception { - new JavaTestKit(getSystem()) {{ - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). - put("default", Arrays.asList("member-1", "member-2")). - put("astronauts", Arrays.asList("member-2")).build()); + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); - ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig)); + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - shardManager.tell(new AddShardReplica("astronauts"), getRef()); - expectMsgClass(duration("2 seconds"), Status.Success.class); - }}; + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, + new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + final ActorSystem system2 = ActorSystem.create("cluster-test", + ConfigFactory.load().getConfig("Member2")); + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + String name = new ShardIdentifier("astronauts", "member-2", "config").toString(); + final TestActorRef mockShardLeaderActor = + TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + final TestActorRef leaderShardManager = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor, + new ClusterWrapperImpl(system2), mockConfig), shardManagerID); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor); + leaderShardManager.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor); + + newReplicaShardManager.underlyingActor().waitForMemberUp(); + leaderShardManager.underlyingActor().waitForMemberUp(); + + //construct a mock response message + AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2); + mockShardLeaderActor.underlyingActor().updateResponse(response); + newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); + AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, + AddServer.class); + String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; + assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); + + expectMsgClass(duration("5 seconds"), Status.Success.class); + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + + @Test + public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, + new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString()); + newReplicaShardManager.underlyingActor().waitForMemberUp(); + + newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); + Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); + assertEquals("Failure obtained", true, + (resp.cause() instanceof RuntimeException)); + }}; + + JavaTestKit.shutdownActorSystem(system1); } @Test @@ -1248,4 +1331,24 @@ public class ShardManagerTest extends AbstractActorTest { findPrimaryMessageReceived = new CountDownLatch(1); } } + + private static class MockRespondActor extends MessageCollectorActor { + + private Object responseMsg; + + public void updateResponse(Object response) { + responseMsg = response; + } + + @Override + public void onReceive(Object message) throws Exception { + super.onReceive(message); + if (message instanceof AddServer) { + if (responseMsg != null) { + getSender().tell(responseMsg, getSelf()); + responseMsg = null; + } + } + } + } }