X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcServiceTest.java;h=9ac5c7af61d635a71cf040094efb3605760e0cc0;hb=6a5a8670a47f8989998390b6bab6718c1a7857b5;hp=abb6adb295d2dafccf5cf774727010a0dc2dced3;hpb=769ef0f950f2ed6cfc14d274e6a8edc583a36a96;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index abb6adb295..9ac5c7af61 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -7,57 +7,65 @@ */ package org.opendaylight.controller.cluster.datastore.admin; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent; +import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent; +import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState; import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.actor.AddressFromURIString; import akka.actor.PoisonPill; +import akka.actor.Status.Success; import akka.cluster.Cluster; -import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.Member; -import akka.cluster.MemberStatus; -import akka.testkit.JavaTestKit; import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.ConfigFactory; import java.io.File; import java.io.FileInputStream; +import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.junit.After; +import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.MemberNode; +import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; +import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; /** * Unit tests for ClusterAdminRpcService. @@ -65,10 +73,14 @@ import scala.concurrent.duration.Duration; * @author Thomas Pantelis */ public class ClusterAdminRpcServiceTest { - private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); - private final List memberNodes = new ArrayList<>(); + @Before + public void setUp() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + } + @After public void tearDown() { for(MemberNode m: memberNodes) { @@ -85,11 +97,11 @@ public class ClusterAdminRpcServiceTest { String fileName = "target/testBackupDatastore"; new File(fileName).delete(); - ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore, node.operDataStore); + ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore()); RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder(). setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); - checkSuccessfulRpcResult(rpcResult); + verifySuccessfulRpcResult(rpcResult); try(FileInputStream fis = new FileInputStream(fileName)) { List snapshots = SerializationUtils.deserialize(fis); @@ -97,21 +109,21 @@ public class ClusterAdminRpcServiceTest { ImmutableMap map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0), snapshots.get(1).getType(), snapshots.get(1)); - verifyDatastoreSnapshot(node.configDataStore.getActorContext().getDataStoreType(), - map.get(node.configDataStore.getActorContext().getDataStoreType()), "cars", "people"); + verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(), + map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people"); } finally { new File(fileName).delete(); } // Test failure by killing a shard. - node.configDataStore.getActorContext().getShardManager().tell(node.datastoreContextBuilder. + node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder(). shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender()); - ActorRef carsShardActor = node.configDataStore.getActorContext().findLocalShard("cars").get(); - node.kit.watch(carsShardActor); + ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get(); + node.kit().watch(carsShardActor); carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - node.kit.expectTerminated(carsShardActor); + node.kit().expectTerminated(carsShardActor); rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()). get(5, TimeUnit.SECONDS); @@ -143,7 +155,7 @@ public class ClusterAdminRpcServiceTest { leaderNode1.waitForMembersUp("member-2"); - testAddShardReplica(newReplicaNode2, "cars", "member-1"); + doAddShardReplica(newReplicaNode2, "cars", "member-1"); MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). moduleShardsConfig(moduleShardsConfig).build(); @@ -151,17 +163,17 @@ public class ClusterAdminRpcServiceTest { leaderNode1.waitForMembersUp("member-3"); newReplicaNode2.waitForMembersUp("member-3"); - testAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2"); + doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2"); - verifyRaftPeersPresent(newReplicaNode2.configDataStore, "cars", "member-1", "member-3"); - verifyRaftPeersPresent(newReplicaNode2.operDataStore, "cars", "member-1", "member-3"); + verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3"); // Write data to member-2's config datastore and read/verify via member-3 - NormalizedNode configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore, - newReplicaNode3.configDataStore); + NormalizedNode configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(), + newReplicaNode3.configDataStore()); // Write data to member-3's oper datastore and read/verify via member-2 - writeCarsNodeAndVerify(newReplicaNode3.operDataStore, newReplicaNode2.operDataStore); + writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore()); // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 - // 2 ServerConfigurationPayload entries and the transaction payload entry. @@ -174,25 +186,49 @@ public class ClusterAdminRpcServiceTest { } }; - verifyRaftState(leaderNode1.configDataStore, "cars", verifier); - verifyRaftState(leaderNode1.operDataStore, "cars", verifier); + verifyRaftState(leaderNode1.configDataStore(), "cars", verifier); + verifyRaftState(leaderNode1.operDataStore(), "cars", verifier); - verifyRaftState(newReplicaNode2.configDataStore, "cars", verifier); - verifyRaftState(newReplicaNode2.operDataStore, "cars", verifier); + verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier); + verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier); - verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier); - verifyRaftState(newReplicaNode3.operDataStore, "cars", verifier); + verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier); + verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier); // Restart member-3 and verify the cars config shard is re-instated. - Cluster.get(leaderNode1.kit.getSystem()).down(Cluster.get(newReplicaNode3.kit.getSystem()).selfAddress()); + Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress()); newReplicaNode3.cleanup(); newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build(); - verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier); - readCarsNodeAndVerify(newReplicaNode3.configDataStore, configCarsNode); + verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier); + readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode); + } + + @Test + public void testAddShardReplicaFailures() throws Exception { + String name = "testAddShardReplicaFailures"; + MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name). + moduleShardsConfig("module-shards-cars-member-1.conf").build(); + + ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore()); + + RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder(). + setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + verifyFailedRpcResult(rpcResult); + + rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars"). + build()).get(10, TimeUnit.SECONDS); + verifyFailedRpcResult(rpcResult); + + rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people"). + setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + verifyFailedRpcResult(rpcResult); + + service.close(); } private NormalizedNode writeCarsNodeAndVerify(DistributedDataStore writeToStore, @@ -219,66 +255,32 @@ public class ClusterAdminRpcServiceTest { assertEquals("Data node", expCarsNode, optional.get()); } - private void testAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) + private void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) throws Exception { memberNode.waitForMembersUp(peerMemberNames); - ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore, - memberNode.operDataStore); + ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore()); RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName). - build()).get(10, TimeUnit.SECONDS); - checkSuccessfulRpcResult(rpcResult); + setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); - verifyRaftPeersPresent(memberNode.configDataStore, shardName, peerMemberNames); - verifyRaftPeersPresent(memberNode.operDataStore, shardName, peerMemberNames); + verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); - service.close(); - } + Optional optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName); + assertEquals("Oper shard present", false, optional.isPresent()); - private void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, String... peerMemberNames) - throws Exception { - final Set peerIds = Sets.newHashSet(); - for(String p: peerMemberNames) { - peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName). - type(datastore.getActorContext().getDataStoreType()).build().toString()); - } + rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName). + setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); - verifyRaftState(datastore, shardName, new RaftStateVerifier() { - @Override - public void verify(OnDemandRaftState raftState) { - assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName, - raftState.getPeerAddresses().keySet().containsAll(peerIds)); - } - }); - } - - private void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) - throws Exception { - ActorContext actorContext = datastore.getActorContext(); - - Future future = actorContext.findLocalShardAsync(shardName); - ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); - - AssertionError lastError = null; - Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 5) { - OnDemandRaftState raftState = (OnDemandRaftState)actorContext. - executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); - - try { - verifier.verify(raftState); - return; - } catch (AssertionError e) { - lastError = e; - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - } - } + verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); - throw lastError; + service.close(); } - private void checkSuccessfulRpcResult(RpcResult rpcResult) { + private T verifySuccessfulRpcResult(RpcResult rpcResult) { if(!rpcResult.isSuccessful()) { if(rpcResult.getErrors().size() > 0) { RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); @@ -287,134 +289,275 @@ public class ClusterAdminRpcServiceTest { fail("Rpc failed with no error"); } - } - @Test - public void testRemoveShardReplica() { - // TODO implement + return rpcResult.getResult(); } - @Test - public void testAddReplicasForAllShards() { - // TODO implement + private void verifyFailedRpcResult(RpcResult rpcResult) { + assertEquals("RpcResult", false, rpcResult.isSuccessful()); + assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size()); + RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); + assertNotNull("RpcResult error message null", error.getMessage()); } @Test - public void testRemoveAllShardReplicas() { - // TODO implement - } + public void testRemoveShardReplica() throws Exception { + String name = "testRemoveShardReplicaLocal"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig). + datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); - @Test - public void testConvertMembersToVotingForAllShards() { - // TODO implement + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); + + // Invoke RPC service on member-3 to remove it's local shard + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder(). + setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()). + get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + service3.close(); + + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1"); + verifyNoShardPresent(replicaNode3.configDataStore(), "cars"); + + // Restart member-2 and verify member-3 isn't present. + + Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress()); + replicaNode2.cleanup(); + + replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1"); + + // Invoke RPC service on member-1 to remove member-2 + + ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), + leaderNode1.operDataStore()); + + rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder(). + setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()). + get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + service1.close(); + + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars"); + verifyNoShardPresent(replicaNode2.configDataStore(), "cars"); } @Test - public void testConvertMembersToNonvotingForAllShards() { - // TODO implement - } + public void testRemoveShardLeaderReplica() throws Exception { + String name = "testRemoveShardLeaderReplica"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig). + datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); - private static class MemberNode { - IntegrationTestKit kit; - DistributedDataStore configDataStore; - DistributedDataStore operDataStore; - final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). - shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); - boolean cleanedUp; + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); - static Builder builder(List members) { - return new Builder(members); - } + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); - void waitForMembersUp(String... otherMembers) { - Set otherMembersSet = Sets.newHashSet(otherMembers); - Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 10) { - CurrentClusterState state = Cluster.get(kit.getSystem()).state(); - for(Member m: state.getMembers()) { - if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) && - otherMembersSet.isEmpty()) { - return; - } - } - - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } + leaderNode1.configDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); - fail("Member(s) " + otherMembersSet + " are not Up"); - } + replicaNode2.waitForMembersUp("member-1", "member-3"); + replicaNode2.waitForMembersUp("member-1", "member-2"); - void cleanup() { - if(!cleanedUp) { - cleanedUp = true; - kit.cleanup(configDataStore); - kit.cleanup(operDataStore); - JavaTestKit.shutdownActorSystem(kit.getSystem()); - } - } + // Invoke RPC service on leader member-1 to remove it's local shard - static class Builder { - List members; - String moduleShardsConfig; - String akkaConfig; - String[] waitForshardLeader = new String[0]; - String testName; - boolean createOperDatastore = true; + ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), + leaderNode1.operDataStore()); - Builder(List members) { - this.members = members; - } + RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder(). + setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()). + get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + service1.close(); - Builder moduleShardsConfig(String moduleShardsConfig) { - this.moduleShardsConfig = moduleShardsConfig; - return this; + verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"), + containsString("member-3"))); } + }); - Builder akkaConfig(String akkaConfig) { - this.akkaConfig = akkaConfig; - return this; - } + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2"); + verifyNoShardPresent(leaderNode1.configDataStore(), "cars"); + } - Builder testName(String testName) { - this.testName = testName; - return this; - } + @Test + public void testAddReplicasForAllShards() throws Exception { + String name = "testAddReplicasForAllShards"; + String moduleShardsConfig = "module-shards-member1.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build(); - Builder waitForShardLeader(String... shardNames) { - this.waitForshardLeader = shardNames; - return this; - } + ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", + "pets", null, Arrays.asList("member-1")); + leaderNode1.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); + leaderNode1.kit().expectMsgClass(Success.class); + leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets"); - Builder createOperDatastore(boolean value) { - this.createOperDatastore = value; - return this; - } + MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); - MemberNode build() { - MemberNode node = new MemberNode(); - ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig)); - Cluster.get(system).join(MEMBER_1_ADDRESS); + leaderNode1.waitForMembersUp("member-2"); + newReplicaNode2.waitForMembersUp("member-1"); + + newReplicaNode2.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef()); + newReplicaNode2.kit().expectMsgClass(Success.class); + + newReplicaNode2.operDataStore().getActorContext().getShardManager().tell( + new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module", + "no-leader", null, Arrays.asList("member-1")), Shard.builder(), null), + newReplicaNode2.kit().getRef()); + newReplicaNode2.kit().expectMsgClass(Success.class); + + ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), + newReplicaNode2.operDataStore()); + + RpcResult rpcResult = service.addReplicasForAllShards().get(10, TimeUnit.SECONDS); + AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("pets", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational), + failedShardResult("no-leader", DataStoreType.Operational)); + + verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1"); + verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1"); + verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1"); + verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1"); + verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1"); + + service.close(); + } + + @Test + public void testRemoveAllShardReplicas() throws Exception { + String name = "testRemoveAllShardReplicas"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); + + ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", + "pets", null, Arrays.asList("member-1", "member-2", "member-3")); + leaderNode1.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); + leaderNode1.kit().expectMsgClass(Success.class); + + replicaNode2.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef()); + replicaNode2.kit().expectMsgClass(Success.class); + + replicaNode3.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef()); + replicaNode3.kit().expectMsgClass(Success.class); + + verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2"); + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.removeAllShardReplicas( + new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS); + RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("pets", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2"); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2"); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1"); + verifyNoShardPresent(replicaNode3.configDataStore(), "cars"); + verifyNoShardPresent(replicaNode3.configDataStore(), "people"); + verifyNoShardPresent(replicaNode3.configDataStore(), "pets"); + + service3.close(); + } - node.kit = new IntegrationTestKit(system, node.datastoreContextBuilder); + @Test + public void testConvertMembersToVotingForAllShards() { + // TODO implement + } - String memberName = new ClusterWrapperImpl(system).getCurrentMemberName(); - node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName); - node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig, - true, waitForshardLeader); + @Test + public void testConvertMembersToNonvotingForAllShards() { + // TODO implement + } - if(createOperDatastore) { - node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName); - node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig, - true, waitForshardLeader); - } + private void verifyShardResults(List shardResults, ShardResult... expShardResults) { + Map expResultsMap = new HashMap<>(); + for(ShardResult r: expShardResults) { + expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r); + } - members.add(node); - return node; + for(ShardResult result: shardResults) { + ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType()); + assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(), + result.getDataStoreType()), exp); + assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded()); + if(exp.isSucceeded()) { + assertNull("Expected null error message", result.getErrorMessage()); + } else { + assertNotNull("Expected error message", result.getErrorMessage()); } } + + if(!expResultsMap.isEmpty()) { + fail("Missing shard results for " + expResultsMap.keySet()); + } + } + + private ShardResult successShardResult(String shardName, DataStoreType type) { + return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(true).build(); } - private static interface RaftStateVerifier { - void verify(OnDemandRaftState raftState); + private ShardResult failedShardResult(String shardName, DataStoreType type) { + return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(false).build(); } }