X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcServiceTest.java;fp=opendaylight%2Fmd-sal%2Fsal-cluster-admin%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcServiceTest.java;h=8ec173d284bc88eeeab4be3c3d581370016f906c;hp=1c78622d5eac0f691b4e196d18ae2a782d4eed35;hb=a8000ee3b6071fa3b83500a39fc60ab3a9c5f085;hpb=30b754bbd3259e926dca9a9b3c20d06be9711c82 diff --git a/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 1c78622d5e..8ec173d284 100644 --- a/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent; @@ -22,18 +23,21 @@ import akka.actor.PoisonPill; import akka.actor.Status.Success; import akka.cluster.Cluster; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.io.File; import java.io.FileInputStream; import java.net.URI; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; @@ -47,8 +51,14 @@ import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; +import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -58,10 +68,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError; @@ -133,8 +148,6 @@ public class ClusterAdminRpcServiceTest { get(5, TimeUnit.SECONDS); assertEquals("isSuccessful", false, rpcResult.isSuccessful()); assertEquals("getErrors", 1, rpcResult.getErrors().size()); - - service.close(); } private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) { @@ -231,8 +244,6 @@ public class ClusterAdminRpcServiceTest { rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people"). setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); verifyFailedRpcResult(rpcResult); - - service.close(); } private static NormalizedNode writeCarsNodeAndVerify(DistributedDataStore writeToStore, @@ -280,8 +291,6 @@ public class ClusterAdminRpcServiceTest { verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); - - service.close(); } private static T verifySuccessfulRpcResult(RpcResult rpcResult) { @@ -333,7 +342,6 @@ public class ClusterAdminRpcServiceTest { setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()). get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); - service3.close(); verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2"); verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1"); @@ -358,7 +366,6 @@ public class ClusterAdminRpcServiceTest { setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()). get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); - service1.close(); verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars"); verifyNoShardPresent(replicaNode2.configDataStore(), "cars"); @@ -396,7 +403,6 @@ public class ClusterAdminRpcServiceTest { setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()). get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); - service1.close(); verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() { @Override @@ -458,8 +464,6 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1"); verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1"); verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1"); - - service.close(); } @Test @@ -520,18 +524,379 @@ public class ClusterAdminRpcServiceTest { verifyNoShardPresent(replicaNode3.configDataStore(), "cars"); verifyNoShardPresent(replicaNode3.configDataStore(), "people"); verifyNoShardPresent(replicaNode3.configDataStore(), "pets"); + } + + @Test + public void testChangeMemberVotingStatesForShard() throws Exception { + String name = "testChangeMemberVotingStatusForShard"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); - service3.close(); + // Invoke RPC service on member-3 to change voting status + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.changeMemberVotingStatesForShard( + new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars"). + setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of( + new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(), + new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()). + get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + + verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false)); + verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false)); + verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false)); } @Test - public void testConvertMembersToVotingForAllShards() { - // TODO implement + public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception { + String name = "testChangeMemberVotingStatesForSingleNodeShard"; + String moduleShardsConfig = "module-shards-member1.conf"; + MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + leaderNode.configDataStore().waitTillReady(); + + // Invoke RPC service on member-3 to change voting status + + ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(), + leaderNode.operDataStore()); + + RpcResult rpcResult = service.changeMemberVotingStatesForShard( + new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars"). + setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of( + new MemberVotingStateBuilder().setMemberName("member-1").setVoting(false).build())).build()). + get(10, TimeUnit.SECONDS); + verifyFailedRpcResult(rpcResult); + + verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", true)); + } + + @Test + public void testChangeMemberVotingStatesForAllShards() throws Exception { + String name = "testChangeMemberVotingStatesForAllShards"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + leaderNode1.operDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); + + // Invoke RPC service on member-3 to change voting status + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.changeMemberVotingStatesForAllShards( + new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of( + new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(), + new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()). + get(10, TimeUnit.SECONDS); + ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false)); } @Test - public void testConvertMembersToNonvotingForAllShards() { - // TODO implement + public void testFlipMemberVotingStates() throws Exception { + String name = "testFlipMemberVotingStates"; + + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo("member-1", true), new ServerInfo("member-2", true), + new ServerInfo("member-3", false))); + + setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); + + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + leaderNode1.operDataStore().waitTillReady(); + verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), + new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", false)); + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards(). + get(10, TimeUnit.SECONDS); + FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, + new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", true)); + + // Leadership should have transferred to member 3 since it is the only remaining voting member. + verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-3")); + }); + + verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-3")); + }); + + // Flip the voting states back to the original states. + + rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS); + result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, + new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true), + new SimpleEntry<>("member-3", false)); + + // Leadership should have transferred to member 1 or 2. + verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2")); + }); + } + + @Test + public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception { + String name = "testFlipMemberVotingStatesWithNoInitialLeader"; + + // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially + // non-voting and simulated as down by not starting them up. + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo("member-1", false), new ServerInfo("member-2", false), + new ServerInfo("member-3", false), new ServerInfo("member-4", true), + new ServerInfo("member-5", true), new ServerInfo("member-6", true))); + + setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); + + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + // Initially there won't be a leader b/c all the up nodes are non-voting. + + replicaNode1.waitForMembersUp("member-2", "member-3"); + + verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", false), + new SimpleEntry<>("member-2", false), new SimpleEntry<>("member-3", false), + new SimpleEntry<>("member-4", true), new SimpleEntry<>("member-5", true), + new SimpleEntry<>("member-6", true)); + + verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> + assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState())); + + ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), + replicaNode1.operDataStore()); + + RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards(). + get(10, TimeUnit.SECONDS); + FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, + new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true), + new SimpleEntry<>("member-3", true), new SimpleEntry<>("member-4", false), + new SimpleEntry<>("member-5", false), new SimpleEntry<>("member-6", false)); + + // Since member 1 was changed to voting and there was no leader, it should've started and election + // and become leader + verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-1")); + }); + + verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-1")); + }); + } + + @Test + public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception { + String name = "testFlipMemberVotingStatesWithVotingMembersDown"; + + // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up. + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo("member-1", true), new ServerInfo("member-2", true), + new ServerInfo("member-3", true), new ServerInfo("member-4", false), + new ServerInfo("member-5", false), new ServerInfo("member-6", false))); + + setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); + + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + leaderNode1.operDataStore().waitTillReady(); + verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), + new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", true), + new SimpleEntry<>("member-4", false), new SimpleEntry<>("member-5", false), + new SimpleEntry<>("member-6", false)); + + ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), + leaderNode1.operDataStore()); + + RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards(). + get(10, TimeUnit.SECONDS); + FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + // Members 2 and 3 are now non-voting but should get replicated with the new new server config. + verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, + new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false), new SimpleEntry<>("member-4", true), + new SimpleEntry<>("member-5", true), new SimpleEntry<>("member-6", true)); + + // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet + // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader. + verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1")); + }); + } + + private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig, + String member, String datastoreTypeSuffix, String... shards) { + String[] datastoreTypes = {"config_", "oper_"}; + for(String type: datastoreTypes) { + for(String shard: shards) { + List newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size()); + for(ServerInfo info: serverConfig.getServerConfig()) { + newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()), + type + datastoreTypeSuffix).toString(), info.isVoting())); + } + + String shardID = ShardIdentifier.create(shard, MemberName.forName(member), + type + datastoreTypeSuffix).toString(); + InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null)); + InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1, + new ServerConfigurationPayload(newServerInfo))); + } + } + } + + @SafeVarargs + private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards, + SimpleEntry... expStates) throws Exception { + for(DistributedDataStore datastore: datastores) { + for(String shard: shards) { + verifyVotingStates(datastore, shard, expStates); + } + } + } + + @SafeVarargs + private static void verifyVotingStates(DistributedDataStore datastore, String shardName, + SimpleEntry... expStates) throws Exception { + String localMemberName = datastore.getActorContext().getCurrentMemberName().getName(); + Map expStateMap = new HashMap<>(); + for(Entry e: expStates) { + expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()), + datastore.getActorContext().getDataStoreName()).toString(), e.getValue()); + } + + verifyRaftState(datastore, shardName, raftState -> { + String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName), + datastore.getActorContext().getDataStoreName()).toString(); + assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting()); + for(Entry e: raftState.getPeerVotingStates().entrySet()) { + assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue()); + } + }); } private static void verifyShardResults(List shardResults, ShardResult... expShardResults) {