X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManagerTest.java;h=3b81412bac178017a1f3a97faf85325916a3a142;hp=bd452a73e697ede549f68f38fbed4b5f276284a9;hb=a8000ee3b6071fa3b83500a39fc60ab3a9c5f085;hpb=4e3f49788c05730b29468deebc2aaa4ed0d94eef diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index bd452a73e6..3b81412bac 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -86,6 +86,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; @@ -116,8 +117,10 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -167,7 +170,7 @@ public class ShardManagerTest extends AbstractActorTest { InMemorySnapshotStore.clear(); if(mockShardActor == null) { - mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, MEMBER_1, "config"); + mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config"); mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName.toString()); } @@ -194,7 +197,7 @@ public class ShardManagerTest extends AbstractActorTest { } private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { - String name = new ShardIdentifier(shardName, MemberName.forName(memberName), "config").toString(); + String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString(); if(system == getSystem()) { return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name); } @@ -1222,10 +1225,10 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig(). getPeerAddressResolver() instanceof ShardPeerAddressResolver); assertEquals("peerMembers", Sets.newHashSet( - new ShardIdentifier("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(), - new ShardIdentifier("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()), + ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(), + ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()), shardBuilder.getPeerAddresses().keySet()); - assertEquals("ShardIdentifier", new ShardIdentifier("foo", MEMBER_1, shardMrgIDSuffix), + assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix), shardBuilder.getId()); assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); @@ -1442,9 +1445,11 @@ public class ShardManagerTest extends AbstractActorTest { final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - String name = new ShardIdentifier("astronauts", MEMBER_2, "config").toString(); + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString(); final TestActorRef mockShardLeaderActor = - TestActorRef.create(system2, Props.create(MockRespondActor.class). + TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class, + new AddServerReply(ServerChangeStatus.OK, memberId2)). withDispatcher(Dispatchers.DefaultDispatcherId()), name); final TestActorRef leaderShardManager = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster( @@ -1458,7 +1463,6 @@ public class ShardManagerTest extends AbstractActorTest { leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); - String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), mockShardLeaderActor); @@ -1478,8 +1482,6 @@ public class ShardManagerTest extends AbstractActorTest { InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID); //construct a mock response message - AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2); - mockShardLeaderActor.underlyingActor().updateResponse(response); newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, AddServer.class); @@ -1512,7 +1514,7 @@ public class ShardManagerTest extends AbstractActorTest { String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix; AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null); ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf( - Props.create(MockRespondActor.class, addServerReply), leaderId); + Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId); MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString()); @@ -1675,7 +1677,8 @@ public class ShardManagerTest extends AbstractActorTest { String memberId = "member-1-shard-default-" + shardMrgIDSuffix; final TestActorRef respondActor = - TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId); + actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class, + new RemoveServerReply(ServerChangeStatus.OK, null)), memberId); ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); @@ -1686,10 +1689,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name())), respondActor); - respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null)); shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef()); final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class); - assertEquals(new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(), + assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(), removeServer.getServerId()); expectMsgClass(duration("5 seconds"), Success.class); }}; @@ -1718,9 +1720,11 @@ public class ShardManagerTest extends AbstractActorTest { final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - String name = new ShardIdentifier("default", MEMBER_2, shardMrgIDSuffix).toString(); + String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString(); + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; final TestActorRef mockShardLeaderActor = - TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class, + new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name); LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor); @@ -1754,7 +1758,6 @@ public class ShardManagerTest extends AbstractActorTest { leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor); - String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), mockShardLeaderActor); @@ -1771,12 +1774,10 @@ public class ShardManagerTest extends AbstractActorTest { leaderShardManager.underlyingActor().waitForMemberUp(); //construct a mock response message - RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2); - mockShardLeaderActor.underlyingActor().updateResponse(response); newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef()); RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, RemoveServer.class); - String removeServerId = new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(); + String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(); assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId()); expectMsgClass(duration("5 seconds"), Status.Success.class); }}; @@ -1866,8 +1867,7 @@ public class ShardManagerTest extends AbstractActorTest { put("astronauts", Arrays.asList("member-2")). put("people", Arrays.asList("member-1", "member-2")).build()); - String shardId = ShardIdentifier.builder().shardName("default").memberName(MEMBER_1). - type(shardMrgIDSuffix).build().toString(); + String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(); TestActorRef shard = actorFactory.createTestActor( MessageCollectorActor.props(), shardId); @@ -1939,13 +1939,11 @@ public class ShardManagerTest extends AbstractActorTest { put("shard1", Arrays.asList("member-1")). put("shard2", Arrays.asList("member-1")).build()); - String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName(MEMBER_1). - type(shardMrgIDSuffix).build().toString(); + String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString(); TestActorRef shard1 = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1); - String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName(MEMBER_1). - type(shardMrgIDSuffix).build().toString(); + String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString(); TestActorRef shard2 = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2); @@ -1980,6 +1978,62 @@ public class ShardManagerTest extends AbstractActorTest { LOG.info("testShutDown ending"); } + @Test + public void testChangeServersVotingStatus() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + TestActorRef respondActor = + actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, + new ServerChangeReply(ServerChangeStatus.OK, null)), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), getRef()); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), respondActor); + + shardManager.tell(new ChangeShardMembersVotingStatus("default", + ImmutableMap.of("member-2", Boolean.TRUE)), getRef()); + + ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching( + respondActor, ChangeServersVotingStatus.class); + assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(), + ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"), + shardMrgIDSuffix).toString(), Boolean.TRUE)); + + expectMsgClass(duration("5 seconds"), Success.class); + }}; + } + + @Test + public void testChangeServersVotingStatusWithNoLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + TestActorRef respondActor = + actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, + new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor); + + shardManager.tell(new ChangeShardMembersVotingStatus("default", + ImmutableMap.of("member-2", Boolean.TRUE)), getRef()); + + MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class); + + Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); + assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException)); + }}; + } + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final CountDownLatch snapshotPersist = new CountDownLatch(1); @@ -2187,35 +2241,25 @@ public class ShardManagerTest extends AbstractActorTest { private static class MockRespondActor extends MessageCollectorActor { static final String CLEAR_RESPONSE = "clear-response"; - static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class); - private volatile Object responseMsg; + private Object responseMsg; + private final Class requestClass; @SuppressWarnings("unused") - public MockRespondActor() { - } - - @SuppressWarnings("unused") - public MockRespondActor(Object responseMsg) { + public MockRespondActor(Class requestClass, Object responseMsg) { + this.requestClass = requestClass; this.responseMsg = responseMsg; } - public void updateResponse(Object response) { - responseMsg = response; - } - @Override public void onReceive(Object message) throws Exception { - if(!"get-all-messages".equals(message)) { - LOG.debug("Received message : {}", message); - } - super.onReceive(message); - if (message instanceof AddServer && responseMsg != null) { - getSender().tell(responseMsg, getSelf()); - } else if(message instanceof RemoveServer && responseMsg != null){ - getSender().tell(responseMsg, getSelf()); - } else if(message.equals(CLEAR_RESPONSE)) { + if(message.equals(CLEAR_RESPONSE)) { responseMsg = null; + } else { + super.onReceive(message); + if (message.getClass().equals(requestClass) && responseMsg != null) { + getSender().tell(responseMsg, getSelf()); + } } } }