X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcServiceTest.java;h=420ce9563865f07cafc1a7cc70f4bc5d3caba3a5;hb=9ba29cd861d75d328aa67d53726250226dc8e438;hp=5cca07428f2e61ddf2b6700bb32e55a7abfb1b67;hpb=be338c9e1dab83e2a5ff21819b92b934ef32faee;p=controller.git diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 5cca07428f..420ce95638 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -64,35 +64,53 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; @@ -137,7 +155,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null); - RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder() + RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder() .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); @@ -169,8 +187,8 @@ public class ClusterAdminRpcServiceTest { assertEquals("getErrors", 1, rpcResult.getErrors().size()); } - private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, - String... expShardNames) { + private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot, + final String... expShardNames) { assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot); Set shardNames = new HashSet<>(); for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) { @@ -232,6 +250,109 @@ public class ClusterAdminRpcServiceTest { verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); } + @Test + public void testGetShardRole() throws Exception { + String name = "testGetShardRole"; + String moduleShardsConfig = "module-shards-default-member-1.conf"; + + final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + + member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default"); + + final RpcResult successResult = + getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default"); + verifySuccessfulRpcResult(successResult); + assertEquals("Leader", successResult.getResult().getRole()); + + final RpcResult failedResult = + getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars"); + + verifyFailedRpcResult(failedResult); + + final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager(); + + shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH), + "prefix", Collections.singleton(MEMBER_1))), + ActorRef.noSender()); + + member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), + ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); + + final InstanceIdentifier identifier = InstanceIdentifier.create(Cars.class); + final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class); + Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier); + + final RpcResult prefixSuccessResult = + getPrefixShardRole(member1, identifier, serializer); + + verifySuccessfulRpcResult(prefixSuccessResult); + assertEquals("Leader", prefixSuccessResult.getResult().getRole()); + + final InstanceIdentifier peopleId = InstanceIdentifier.create(People.class); + Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId); + + final RpcResult prefixFail = + getPrefixShardRole(member1, peopleId, serializer); + + verifyFailedRpcResult(prefixFail); + } + + @Test + public void testGetPrefixShardRole() throws Exception { + String name = "testGetPrefixShardRole"; + String moduleShardsConfig = "module-shards-default-member-1.conf"; + + final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + + member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default"); + + + } + + @Test + public void testModuleShardLeaderMovement() throws Exception { + String name = "testModuleShardLeaderMovement"; + String moduleShardsConfig = "module-shards-member1.conf"; + + final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build(); + final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + + member1.waitForMembersUp("member-2", "member-3"); + replicaNode2.waitForMembersUp("member-1"); + replicaNode3.waitForMembersUp("member-1", "member-2"); + + doAddShardReplica(replicaNode2, "cars", "member-1"); + doAddShardReplica(replicaNode3, "cars", "member-1", "member-2"); + + verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3"); + + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); + + doMakeShardLeaderLocal(member1, "cars", "member-1"); + verifyRaftState(replicaNode2.configDataStore(), "cars", + raftState -> assertThat(raftState.getLeader(),containsString("member-1"))); + verifyRaftState(replicaNode3.configDataStore(), "cars", + raftState -> assertThat(raftState.getLeader(),containsString("member-1"))); + + doMakeShardLeaderLocal(replicaNode2, "cars", "member-2"); + verifyRaftState(member1.configDataStore(), "cars", + raftState -> assertThat(raftState.getLeader(),containsString("member-2"))); + verifyRaftState(replicaNode3.configDataStore(), "cars", + raftState -> assertThat(raftState.getLeader(),containsString("member-2"))); + + replicaNode2.waitForMembersUp("member-3"); + doMakeShardLeaderLocal(replicaNode3, "cars", "member-3"); + } + @Test public void testAddShardReplica() throws Exception { String name = "testAddShardReplica"; @@ -264,12 +385,12 @@ public class ClusterAdminRpcServiceTest { // Write data to member-3's oper datastore and read/verify via member-2 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore()); - // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 - - // 2 ServerConfigurationPayload entries and the transaction payload entry. + // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 - + // 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload. RaftStateVerifier verifier = raftState -> { - assertEquals("Commit index", 2, raftState.getCommitIndex()); - assertEquals("Last applied index", 2, raftState.getLastApplied()); + assertEquals("Commit index", 4, raftState.getCommitIndex()); + assertEquals("Last applied index", 4, raftState.getLastApplied()); }; verifyRaftState(leaderNode1.configDataStore(), "cars", verifier); @@ -302,7 +423,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null); - RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() + RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); verifyFailedRpcResult(rpcResult); @@ -315,8 +436,8 @@ public class ClusterAdminRpcServiceTest { verifyFailedRpcResult(rpcResult); } - private static NormalizedNode writeCarsNodeAndVerify(AbstractDataStore writeToStore, - AbstractDataStore readFromStore) throws Exception { + private static NormalizedNode writeCarsNodeAndVerify(final AbstractDataStore writeToStore, + final AbstractDataStore readFromStore) throws Exception { DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction(); NormalizedNode carsNode = CarsModel.create(); writeTx.write(CarsModel.BASE_PATH, carsNode); @@ -331,19 +452,47 @@ public class ClusterAdminRpcServiceTest { return carsNode; } - private static void readCarsNodeAndVerify(AbstractDataStore readFromStore, - NormalizedNode expCarsNode) throws Exception { - Optional> optional = readFromStore.newReadOnlyTransaction() + private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore, + final NormalizedNode expCarsNode) throws Exception { + java.util.Optional> optional = readFromStore.newReadOnlyTransaction() .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", expCarsNode, optional.get()); } - private void addPrefixShardReplica(final MemberNode memberNode, - final InstanceIdentifier identifier, - final BindingNormalizedNodeSerializer serializer, - final String shardName, - final String... peerMemberNames) throws Exception { + private static RpcResult getShardRole(final MemberNode memberNode, + final BindingNormalizedNodeSerializer serializer, final String shardName) throws Exception { + + final GetShardRoleInput input = new GetShardRoleInputBuilder() + .setDataStoreType(DataStoreType.Config) + .setShardName(shardName) + .build(); + + final ClusterAdminRpcService service = + new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); + + return service.getShardRole(input).get(10, TimeUnit.SECONDS); + } + + private static RpcResult getPrefixShardRole( + final MemberNode memberNode, + final InstanceIdentifier identifier, + final BindingNormalizedNodeSerializer serializer) throws Exception { + + final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder() + .setDataStoreType(DataStoreType.Config) + .setShardPrefix(identifier) + .build(); + + final ClusterAdminRpcService service = + new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); + + return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS); + } + + private static void addPrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier identifier, + final BindingNormalizedNodeSerializer serializer, final String shardName, + final String... peerMemberNames) throws Exception { final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder() .setShardPrefix(identifier) @@ -352,7 +501,8 @@ public class ClusterAdminRpcServiceTest { final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - final RpcResult rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS); + final RpcResult rpcResult = service.addPrefixShardReplica(input) + .get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); @@ -360,12 +510,9 @@ public class ClusterAdminRpcServiceTest { assertTrue("Replica shard not present", optional.isPresent()); } - private void removePrefixShardReplica(final MemberNode memberNode, - final InstanceIdentifier identifier, - final String removeFromMember, - final BindingNormalizedNodeSerializer serializer, - final String shardName, - final String... peerMemberNames) throws Exception { + private static void removePrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier identifier, + final String removeFromMember, final BindingNormalizedNodeSerializer serializer, final String shardName, + final String... peerMemberNames) throws Exception { final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder() .setDataStoreType(DataStoreType.Config) .setShardPrefix(identifier) @@ -374,21 +521,22 @@ public class ClusterAdminRpcServiceTest { final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - final RpcResult rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS); + final RpcResult rpcResult = service.removePrefixShardReplica(input) + .get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); } - private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) - throws Exception { + private static void doAddShardReplica(final MemberNode memberNode, final String shardName, + final String... peerMemberNames) throws Exception { memberNode.waitForMembersUp(peerMemberNames); ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null); - RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName) - .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() + .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); @@ -403,7 +551,22 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); } - private static T verifySuccessfulRpcResult(RpcResult rpcResult) { + private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName, + final String newLeader) throws Exception { + ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore(), null); + + final RpcResult rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder() + .setDataStoreType(DataStoreType.Config).setShardName(shardName).build()) + .get(10, TimeUnit.SECONDS); + + verifySuccessfulRpcResult(rpcResult); + + verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(), + containsString(newLeader))); + } + + private static T verifySuccessfulRpcResult(final RpcResult rpcResult) { if (!rpcResult.isSuccessful()) { if (rpcResult.getErrors().size() > 0) { RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); @@ -416,7 +579,7 @@ public class ClusterAdminRpcServiceTest { return rpcResult.getResult(); } - private static void verifyFailedRpcResult(RpcResult rpcResult) { + private static void verifyFailedRpcResult(final RpcResult rpcResult) { assertFalse("RpcResult", rpcResult.isSuccessful()); assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size()); RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); @@ -449,7 +612,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), null); - RpcResult rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder() + RpcResult rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder() .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()) .get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); @@ -510,7 +673,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(), null); - RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() + RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()) .get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); @@ -532,7 +695,8 @@ public class ClusterAdminRpcServiceTest { .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build(); ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", - "pets", null, Arrays.asList(MEMBER_1)); + "pets", null, + Collections.singletonList(MEMBER_1)); leaderNode1.configDataStore().getActorContext().getShardManager().tell( new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); leaderNode1.kit().expectMsgClass(Success.class); @@ -550,15 +714,17 @@ public class ClusterAdminRpcServiceTest { newReplicaNode2.operDataStore().getActorContext().getShardManager().tell( new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module", - "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null), + "no-leader", null, + Collections.singletonList(MEMBER_1)), + Shard.builder(), null), newReplicaNode2.kit().getRef()); newReplicaNode2.kit().expectMsgClass(Success.class); ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), newReplicaNode2.operDataStore(), null); - RpcResult rpcResult = - service.addReplicasForAllShards().get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service.addReplicasForAllShards( + new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -661,7 +827,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), null); - RpcResult rpcResult = service3 + RpcResult rpcResult = service3 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() .setShardName("cars").setDataStoreType(DataStoreType.Config) .setMemberVotingState(ImmutableList.of( @@ -695,7 +861,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(), leaderNode.operDataStore(), null); - RpcResult rpcResult = service + RpcResult rpcResult = service .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() .setShardName("cars").setDataStoreType(DataStoreType.Config) .setMemberVotingState(ImmutableList @@ -767,8 +933,8 @@ public class ClusterAdminRpcServiceTest { String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) + .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10)) .build(); final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) @@ -787,8 +953,8 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), null); - RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards() - .get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -805,19 +971,20 @@ public class ClusterAdminRpcServiceTest { // Leadership should have transferred to member 3 since it is the only remaining voting member. verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { assertNotNull("Expected non-null leader Id", raftState.getLeader()); - assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(), raftState.getLeader().contains("member-3")); }); verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> { assertNotNull("Expected non-null leader Id", raftState.getLeader()); - assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(), raftState.getLeader().contains("member-3")); }); // Flip the voting states back to the original states. - rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS); + rpcResult = service3.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -881,8 +1048,8 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), replicaNode1.operDataStore(), null); - RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards() - .get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -948,8 +1115,8 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(), null); - RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards() - .get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -973,8 +1140,8 @@ public class ClusterAdminRpcServiceTest { }); } - private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig, - String member, String datastoreTypeSuffix, String... shards) { + private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig, + final String member, final String datastoreTypeSuffix, final String... shards) { String[] datastoreTypes = {"config_", "oper_"}; for (String type : datastoreTypes) { for (String shard : shards) { @@ -987,15 +1154,15 @@ public class ClusterAdminRpcServiceTest { String shardID = ShardIdentifier.create(shard, MemberName.forName(member), type + datastoreTypeSuffix).toString(); InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null)); - InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1, + InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1, new ServerConfigurationPayload(newServerInfo))); } } } @SafeVarargs - private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards, - SimpleEntry... expStates) throws Exception { + private static void verifyVotingStates(final AbstractDataStore[] datastores, final String[] shards, + final SimpleEntry... expStates) throws Exception { for (AbstractDataStore datastore: datastores) { for (String shard: shards) { verifyVotingStates(datastore, shard, expStates); @@ -1004,8 +1171,8 @@ public class ClusterAdminRpcServiceTest { } @SafeVarargs - private static void verifyVotingStates(AbstractDataStore datastore, String shardName, - SimpleEntry... expStates) throws Exception { + private static void verifyVotingStates(final AbstractDataStore datastore, final String shardName, + final SimpleEntry... expStates) throws Exception { String localMemberName = datastore.getActorContext().getCurrentMemberName().getName(); Map expStateMap = new HashMap<>(); for (Entry e: expStates) { @@ -1023,7 +1190,7 @@ public class ClusterAdminRpcServiceTest { }); } - private static void verifyShardResults(List shardResults, ShardResult... expShardResults) { + private static void verifyShardResults(final List shardResults, final ShardResult... expShardResults) { Map expResultsMap = new HashMap<>(); for (ShardResult r: expShardResults) { expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r); @@ -1046,11 +1213,11 @@ public class ClusterAdminRpcServiceTest { } } - private static ShardResult successShardResult(String shardName, DataStoreType type) { + private static ShardResult successShardResult(final String shardName, final DataStoreType type) { return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build(); } - private static ShardResult failedShardResult(String shardName, DataStoreType type) { + private static ShardResult failedShardResult(final String shardName, final DataStoreType type) { return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build(); } }