X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcServiceTest.java;h=1362db752e123014c7a6e832262f7bda4a6f00ac;hp=fa7927ef4219afc29d649e2a0d249a086dc126d2;hb=5f587c3e2bfabc09fec49463d04a6fbeba414e9c;hpb=63ea0341c647e2dc4a795d4c48552186c25816bd diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index fa7927ef42..1362db752e 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -11,11 +11,11 @@ import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent; @@ -26,7 +26,6 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Status.Success; import akka.cluster.Cluster; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -44,13 +43,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -58,11 +57,9 @@ import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; @@ -70,42 +67,32 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; -import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -146,7 +133,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null); - RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder() + RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder() .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); @@ -156,18 +143,18 @@ public class ClusterAdminRpcServiceTest { ImmutableMap map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0), snapshots.get(1).getType(), snapshots.get(1)); - verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(), - map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people"); + verifyDatastoreSnapshot(node.configDataStore().getActorUtils().getDataStoreName(), + map.get(node.configDataStore().getActorUtils().getDataStoreName()), "cars", "people"); } finally { new File(fileName).delete(); } // Test failure by killing a shard. - node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder() + node.configDataStore().getActorUtils().getShardManager().tell(node.datastoreContextBuilder() .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender()); - ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get(); + ActorRef carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").get(); node.kit().watch(carsShardActor); carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); node.kit().expectTerminated(carsShardActor); @@ -178,8 +165,8 @@ public class ClusterAdminRpcServiceTest { assertEquals("getErrors", 1, rpcResult.getErrors().size()); } - private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, - String... expShardNames) { + private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot, + final String... expShardNames) { assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot); Set shardNames = new HashSet<>(); for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) { @@ -189,107 +176,6 @@ public class ClusterAdminRpcServiceTest { assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames); } - @Test - public void testAddRemovePrefixShardReplica() throws Exception { - String name = "testAddPrefixShardReplica"; - String moduleShardsConfig = "module-shards-default.conf"; - - final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - - member1.waitForMembersUp("member-2", "member-3"); - replicaNode2.kit().waitForMembersUp("member-1", "member-3"); - replicaNode3.kit().waitForMembersUp("member-1", "member-2"); - - final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager(); - - shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH), - "prefix", Collections.singleton(MEMBER_1))), - ActorRef.noSender()); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - final InstanceIdentifier identifier = InstanceIdentifier.create(Cars.class); - final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class); - Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier); - - addPrefixShardReplica(replicaNode2, identifier, serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1"); - - addPrefixShardReplica(replicaNode3, identifier, serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2"); - - verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), - "member-2", "member-3"); - - removePrefixShardReplica(member1, identifier, "member-3", serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2"); - - verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), - "member-1"); - - removePrefixShardReplica(member1, identifier, "member-2", serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - } - - @Test - public void testGetShardRole() throws Exception { - String name = "testGetShardRole"; - String moduleShardsConfig = "module-shards-default-member-1.conf"; - - final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default"); - - final RpcResult successResult = - getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default"); - verifySuccessfulRpcResult(successResult); - assertEquals("Leader", successResult.getResult().getRole()); - - final RpcResult failedResult = - getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars"); - - verifyFailedRpcResult(failedResult); - - final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager(); - - shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH), - "prefix", Collections.singleton(MEMBER_1))), - ActorRef.noSender()); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - final InstanceIdentifier identifier = InstanceIdentifier.create(Cars.class); - final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class); - Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier); - - final RpcResult prefixSuccessResult = - getPrefixShardRole(member1, identifier, serializer); - - verifySuccessfulRpcResult(prefixSuccessResult); - assertEquals("Leader", prefixSuccessResult.getResult().getRole()); - - final InstanceIdentifier peopleId = InstanceIdentifier.create(People.class); - Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId); - - final RpcResult prefixFail = - getPrefixShardRole(member1, peopleId, serializer); - - verifyFailedRpcResult(prefixFail); - } - @Test public void testGetPrefixShardRole() throws Exception { String name = "testGetPrefixShardRole"; @@ -298,9 +184,7 @@ public class ClusterAdminRpcServiceTest { final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default"); - - + member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default"); } @Test @@ -380,8 +264,8 @@ public class ClusterAdminRpcServiceTest { // 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload. RaftStateVerifier verifier = raftState -> { - assertEquals("Commit index", 3, raftState.getCommitIndex()); - assertEquals("Last applied index", 3, raftState.getLastApplied()); + assertEquals("Commit index", 4, raftState.getCommitIndex()); + assertEquals("Last applied index", 4, raftState.getLastApplied()); }; verifyRaftState(leaderNode1.configDataStore(), "cars", verifier); @@ -414,7 +298,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null); - RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() + RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); verifyFailedRpcResult(rpcResult); @@ -427,8 +311,8 @@ public class ClusterAdminRpcServiceTest { verifyFailedRpcResult(rpcResult); } - private static NormalizedNode writeCarsNodeAndVerify(AbstractDataStore writeToStore, - AbstractDataStore readFromStore) throws Exception { + private static NormalizedNode writeCarsNodeAndVerify(final AbstractDataStore writeToStore, + final AbstractDataStore readFromStore) throws Exception { DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction(); NormalizedNode carsNode = CarsModel.create(); writeTx.write(CarsModel.BASE_PATH, carsNode); @@ -443,102 +327,28 @@ public class ClusterAdminRpcServiceTest { return carsNode; } - private static void readCarsNodeAndVerify(AbstractDataStore readFromStore, - NormalizedNode expCarsNode) throws Exception { - Optional> optional = readFromStore.newReadOnlyTransaction() - .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS); + private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore, + final NormalizedNode expCarsNode) throws Exception { + Optional> optional = readFromStore.newReadOnlyTransaction().read(CarsModel.BASE_PATH) + .get(15, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", expCarsNode, optional.get()); } - private RpcResult getShardRole(final MemberNode memberNode, - final BindingNormalizedNodeSerializer serializer, - final String shardName) throws Exception { - - final GetShardRoleInput input = new GetShardRoleInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardName(shardName) - .build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - return service.getShardRole(input).get(10, TimeUnit.SECONDS); - - } - - private RpcResult getPrefixShardRole( - final MemberNode memberNode, - final InstanceIdentifier identifier, - final BindingNormalizedNodeSerializer serializer) throws Exception { - - final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardPrefix(identifier) - .build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS); - - } - - private void addPrefixShardReplica(final MemberNode memberNode, - final InstanceIdentifier identifier, - final BindingNormalizedNodeSerializer serializer, - final String shardName, - final String... peerMemberNames) throws Exception { - - final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder() - .setShardPrefix(identifier) - .setDataStoreType(DataStoreType.Config).build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - final RpcResult rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS); - verifySuccessfulRpcResult(rpcResult); - - verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); - Optional optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName); - assertTrue("Replica shard not present", optional.isPresent()); - } - - private void removePrefixShardReplica(final MemberNode memberNode, - final InstanceIdentifier identifier, - final String removeFromMember, - final BindingNormalizedNodeSerializer serializer, - final String shardName, - final String... peerMemberNames) throws Exception { - final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardPrefix(identifier) - .setMemberName(removeFromMember).build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - final RpcResult rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS); - verifySuccessfulRpcResult(rpcResult); - - verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); - } - - private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) - throws Exception { + private static void doAddShardReplica(final MemberNode memberNode, final String shardName, + final String... peerMemberNames) throws Exception { memberNode.waitForMembersUp(peerMemberNames); ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null); - RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName) - .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() + .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); - Optional optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName); + Optional optional = memberNode.operDataStore().getActorUtils().findLocalShard(shardName); assertFalse("Oper shard present", optional.isPresent()); rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName) @@ -548,12 +358,12 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); } - private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader) - throws Exception { + private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName, + final String newLeader) throws Exception { ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null); - final RpcResult rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder() + final RpcResult rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder() .setDataStoreType(DataStoreType.Config).setShardName(shardName).build()) .get(10, TimeUnit.SECONDS); @@ -561,10 +371,9 @@ public class ClusterAdminRpcServiceTest { verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(), containsString(newLeader))); - } - private static T verifySuccessfulRpcResult(RpcResult rpcResult) { + private static T verifySuccessfulRpcResult(final RpcResult rpcResult) { if (!rpcResult.isSuccessful()) { if (rpcResult.getErrors().size() > 0) { RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); @@ -577,7 +386,7 @@ public class ClusterAdminRpcServiceTest { return rpcResult.getResult(); } - private static void verifyFailedRpcResult(RpcResult rpcResult) { + private static void verifyFailedRpcResult(final RpcResult rpcResult) { assertFalse("RpcResult", rpcResult.isSuccessful()); assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size()); RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); @@ -610,7 +419,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), null); - RpcResult rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder() + RpcResult rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder() .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()) .get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); @@ -671,7 +480,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(), null); - RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() + RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()) .get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); @@ -695,10 +504,10 @@ public class ClusterAdminRpcServiceTest { ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", "pets", null, Collections.singletonList(MEMBER_1)); - leaderNode1.configDataStore().getActorContext().getShardManager().tell( + leaderNode1.configDataStore().getActorUtils().getShardManager().tell( new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); leaderNode1.kit().expectMsgClass(Success.class); - leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets"); + leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorUtils(), "pets"); MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); @@ -706,11 +515,11 @@ public class ClusterAdminRpcServiceTest { leaderNode1.waitForMembersUp("member-2"); newReplicaNode2.waitForMembersUp("member-1"); - newReplicaNode2.configDataStore().getActorContext().getShardManager().tell( + newReplicaNode2.configDataStore().getActorUtils().getShardManager().tell( new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef()); newReplicaNode2.kit().expectMsgClass(Success.class); - newReplicaNode2.operDataStore().getActorContext().getShardManager().tell( + newReplicaNode2.operDataStore().getActorUtils().getShardManager().tell( new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module", "no-leader", null, Collections.singletonList(MEMBER_1)), @@ -721,8 +530,8 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), newReplicaNode2.operDataStore(), null); - RpcResult rpcResult = - service.addReplicasForAllShards().get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service.addReplicasForAllShards( + new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -760,15 +569,15 @@ public class ClusterAdminRpcServiceTest { ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3)); - leaderNode1.configDataStore().getActorContext().getShardManager().tell( + leaderNode1.configDataStore().getActorUtils().getShardManager().tell( new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); leaderNode1.kit().expectMsgClass(Success.class); - replicaNode2.configDataStore().getActorContext().getShardManager().tell( + replicaNode2.configDataStore().getActorUtils().getShardManager().tell( new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef()); replicaNode2.kit().expectMsgClass(Success.class); - replicaNode3.configDataStore().getActorContext().getShardManager().tell( + replicaNode3.configDataStore().getActorUtils().getShardManager().tell( new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef()); replicaNode3.kit().expectMsgClass(Success.class); @@ -825,7 +634,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), null); - RpcResult rpcResult = service3 + RpcResult rpcResult = service3 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() .setShardName("cars").setDataStoreType(DataStoreType.Config) .setMemberVotingState(ImmutableList.of( @@ -859,7 +668,7 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(), leaderNode.operDataStore(), null); - RpcResult rpcResult = service + RpcResult rpcResult = service .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() .setShardName("cars").setDataStoreType(DataStoreType.Config) .setMemberVotingState(ImmutableList @@ -931,8 +740,8 @@ public class ClusterAdminRpcServiceTest { String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) + .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10)) .build(); final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) @@ -951,8 +760,8 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), null); - RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards() - .get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -969,19 +778,20 @@ public class ClusterAdminRpcServiceTest { // Leadership should have transferred to member 3 since it is the only remaining voting member. verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { assertNotNull("Expected non-null leader Id", raftState.getLeader()); - assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(), raftState.getLeader().contains("member-3")); }); verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> { assertNotNull("Expected non-null leader Id", raftState.getLeader()); - assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(), raftState.getLeader().contains("member-3")); }); // Flip the voting states back to the original states. - rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS); + rpcResult = service3.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -1045,8 +855,8 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), replicaNode1.operDataStore(), null); - RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards() - .get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -1112,8 +922,8 @@ public class ClusterAdminRpcServiceTest { ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(), null); - RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards() - .get(10, TimeUnit.SECONDS); + RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), @@ -1137,8 +947,8 @@ public class ClusterAdminRpcServiceTest { }); } - private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig, - String member, String datastoreTypeSuffix, String... shards) { + private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig, + final String member, final String datastoreTypeSuffix, final String... shards) { String[] datastoreTypes = {"config_", "oper_"}; for (String type : datastoreTypes) { for (String shard : shards) { @@ -1158,8 +968,8 @@ public class ClusterAdminRpcServiceTest { } @SafeVarargs - private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards, - SimpleEntry... expStates) throws Exception { + private static void verifyVotingStates(final AbstractDataStore[] datastores, final String[] shards, + final SimpleEntry... expStates) throws Exception { for (AbstractDataStore datastore: datastores) { for (String shard: shards) { verifyVotingStates(datastore, shard, expStates); @@ -1168,18 +978,18 @@ public class ClusterAdminRpcServiceTest { } @SafeVarargs - private static void verifyVotingStates(AbstractDataStore datastore, String shardName, - SimpleEntry... expStates) throws Exception { - String localMemberName = datastore.getActorContext().getCurrentMemberName().getName(); + private static void verifyVotingStates(final AbstractDataStore datastore, final String shardName, + final SimpleEntry... expStates) throws Exception { + String localMemberName = datastore.getActorUtils().getCurrentMemberName().getName(); Map expStateMap = new HashMap<>(); for (Entry e: expStates) { expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()), - datastore.getActorContext().getDataStoreName()).toString(), e.getValue()); + datastore.getActorUtils().getDataStoreName()).toString(), e.getValue()); } verifyRaftState(datastore, shardName, raftState -> { String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName), - datastore.getActorContext().getDataStoreName()).toString(); + datastore.getActorUtils().getDataStoreName()).toString(); assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting()); for (Entry e: raftState.getPeerVotingStates().entrySet()) { assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue()); @@ -1187,18 +997,19 @@ public class ClusterAdminRpcServiceTest { }); } - private static void verifyShardResults(List shardResults, ShardResult... expShardResults) { + private static void verifyShardResults(final Map shardResults, + final ShardResult... expShardResults) { Map expResultsMap = new HashMap<>(); for (ShardResult r: expShardResults) { expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r); } - for (ShardResult result: shardResults) { + for (ShardResult result: shardResults.values()) { ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType()); assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(), result.getDataStoreType()), exp); - assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded()); - if (exp.isSucceeded()) { + assertEquals("isSucceeded", exp.getSucceeded(), result.getSucceeded()); + if (exp.getSucceeded()) { assertNull("Expected null error message", result.getErrorMessage()); } else { assertNotNull("Expected error message", result.getErrorMessage()); @@ -1210,11 +1021,11 @@ public class ClusterAdminRpcServiceTest { } } - private static ShardResult successShardResult(String shardName, DataStoreType type) { + private static ShardResult successShardResult(final String shardName, final DataStoreType type) { return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build(); } - private static ShardResult failedShardResult(String shardName, DataStoreType type) { + private static ShardResult failedShardResult(final String shardName, final DataStoreType type) { return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build(); } }