Fix intermittent failure in ClusterAdminRpcServiceTest
[controller.git] / opendaylight / md-sal / sal-cluster-admin / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
index 9bfc7e6b1a55fc37b6527bd2b03ffdde48e71f62..08996d6f009d1b10d9e76e10d88bd5ed854e118f 100644 (file)
@@ -13,6 +13,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
@@ -22,32 +23,43 @@ import akka.actor.PoisonPill;
 import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileInputStream;
 import java.net.URI;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.MemberNode;
 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -57,10 +69,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -73,6 +90,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * @author Thomas Pantelis
  */
 public class ClusterAdminRpcServiceTest {
+    private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+    private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+    private static final MemberName MEMBER_3 = MemberName.forName("member-3");
     private final List<MemberNode> memberNodes = new ArrayList<>();
 
     @Before
@@ -83,9 +103,10 @@ public class ClusterAdminRpcServiceTest {
 
     @After
     public void tearDown() {
-        for(MemberNode m: memberNodes) {
+        for (MemberNode m : Lists.reverse(memberNodes)) {
             m.cleanup();
         }
+        memberNodes.clear();
     }
 
     @Test
@@ -129,8 +150,6 @@ public class ClusterAdminRpcServiceTest {
                 get(5, TimeUnit.SECONDS);
         assertEquals("isSuccessful", false, rpcResult.isSuccessful());
         assertEquals("getErrors", 1, rpcResult.getErrors().size());
-
-        service.close();
     }
 
     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
@@ -227,8 +246,6 @@ public class ClusterAdminRpcServiceTest {
         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
         verifyFailedRpcResult(rpcResult);
-
-        service.close();
     }
 
     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
@@ -276,8 +293,6 @@ public class ClusterAdminRpcServiceTest {
         verifySuccessfulRpcResult(rpcResult);
 
         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
-
-        service.close();
     }
 
     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
@@ -329,7 +344,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service3.close();
 
         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
@@ -354,7 +368,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service1.close();
 
         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
         verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
@@ -381,7 +394,7 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
 
         replicaNode2.waitForMembersUp("member-1", "member-3");
-        replicaNode2.waitForMembersUp("member-1", "member-2");
+        replicaNode3.waitForMembersUp("member-1", "member-2");
 
         // Invoke RPC service on leader member-1 to remove it's local shard
 
@@ -392,7 +405,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service1.close();
 
         verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
             @Override
@@ -415,7 +427,7 @@ public class ClusterAdminRpcServiceTest {
                 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
 
         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
-                "pets", null, Arrays.asList("member-1"));
+                "pets", null, Arrays.asList(MEMBER_1));
         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
         leaderNode1.kit().expectMsgClass(Success.class);
@@ -433,7 +445,7 @@ public class ClusterAdminRpcServiceTest {
 
         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
-                        "no-leader", null, Arrays.asList("member-1")), Shard.builder(), null),
+                        "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
                                 newReplicaNode2.kit().getRef());
         newReplicaNode2.kit().expectMsgClass(Success.class);
 
@@ -454,8 +466,6 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
-
-        service.close();
     }
 
     @Test
@@ -478,7 +488,7 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
 
         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
-                "pets", null, Arrays.asList("member-1", "member-2", "member-3"));
+                "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
         leaderNode1.kit().expectMsgClass(Success.class);
@@ -516,18 +526,384 @@ public class ClusterAdminRpcServiceTest {
         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
+    }
+
+    @Test
+    public void testChangeMemberVotingStatesForShard() throws Exception {
+        String name = "testChangeMemberVotingStatusForShard";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        replicaNode3.configDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
 
-        service3.close();
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<Void> rpcResult = service3.changeMemberVotingStatesForShard(
+                new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+                    setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+                        new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        verifySuccessfulRpcResult(rpcResult);
+
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
+        verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
+        verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
     }
 
     @Test
-    public void testConvertMembersToVotingForAllShards() {
-        // TODO implement
+    public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
+        String name = "testChangeMemberVotingStatesForSingleNodeShard";
+        String moduleShardsConfig = "module-shards-member1.conf";
+        MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        leaderNode.configDataStore().waitTillReady();
+
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
+                leaderNode.operDataStore());
+
+        RpcResult<Void> rpcResult = service.changeMemberVotingStatesForShard(
+                new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+                    setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-1").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        verifyFailedRpcResult(rpcResult);
+
+        verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", true));
+    }
+
+    @Test
+    public void testChangeMemberVotingStatesForAllShards() throws Exception {
+        String name = "testChangeMemberVotingStatesForAllShards";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        replicaNode3.configDataStore().waitTillReady();
+        replicaNode3.operDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
+                new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+                        new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
     }
 
     @Test
-    public void testConvertMembersToNonvotingForAllShards() {
-        // TODO implement
+    public void testFlipMemberVotingStates() throws Exception {
+        String name = "testFlipMemberVotingStates";
+
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+                new ServerInfo("member-3", false)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        replicaNode3.configDataStore().waitTillReady();
+        replicaNode3.operDataStore().waitTillReady();
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+                new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", false));
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", true));
+
+        // Leadership should have transferred to member 3 since it is the only remaining voting member.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-3"));
+        });
+
+        verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-3"));
+        });
+
+        // Flip the voting states back to the original states.
+
+        rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
+        result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+                new SimpleEntry<>("member-3", false));
+
+        // Leadership should have transferred to member 1 or 2.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
+        });
+    }
+
+    @Test
+    public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
+        String name = "testFlipMemberVotingStatesWithNoInitialLeader";
+
+        // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
+        // non-voting and simulated as down by not starting them up.
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", false), new ServerInfo("member-2", false),
+                new ServerInfo("member-3", false), new ServerInfo("member-4", true),
+                new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        // Initially there won't be a leader b/c all the up nodes are non-voting.
+
+        replicaNode1.waitForMembersUp("member-2", "member-3");
+
+        verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", false),
+                new SimpleEntry<>("member-2", false), new SimpleEntry<>("member-3", false),
+                new SimpleEntry<>("member-4", true), new SimpleEntry<>("member-5", true),
+                new SimpleEntry<>("member-6", true));
+
+        verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
+            assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
+
+        ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
+                replicaNode1.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+                new SimpleEntry<>("member-3", true), new SimpleEntry<>("member-4", false),
+                new SimpleEntry<>("member-5", false), new SimpleEntry<>("member-6", false));
+
+        // Since member 1 was changed to voting and there was no leader, it should've started and election
+        // and become leader
+        verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-1"));
+        });
+
+        verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-1"));
+        });
+    }
+
+    @Test
+    public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
+        String name = "testFlipMemberVotingStatesWithVotingMembersDown";
+
+        // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+                new ServerInfo("member-3", true), new ServerInfo("member-4", false),
+                new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+                new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", true),
+                new SimpleEntry<>("member-4", false), new SimpleEntry<>("member-5", false),
+                new SimpleEntry<>("member-6", false));
+
+        ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+                leaderNode1.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false), new SimpleEntry<>("member-4", true),
+                new SimpleEntry<>("member-5", true), new SimpleEntry<>("member-6", true));
+
+        // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
+        // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
+        });
+    }
+
+    private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
+            String member, String datastoreTypeSuffix, String... shards) {
+        String[] datastoreTypes = {"config_", "oper_"};
+        for(String type: datastoreTypes) {
+            for(String shard: shards) {
+                List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
+                for(ServerInfo info: serverConfig.getServerConfig()) {
+                    newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
+                            type + datastoreTypeSuffix).toString(), info.isVoting()));
+                }
+
+                String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
+                        type + datastoreTypeSuffix).toString();
+                InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
+                InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
+                        new ServerConfigurationPayload(newServerInfo)));
+            }
+        }
+    }
+
+    @SafeVarargs
+    private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards,
+            SimpleEntry<String, Boolean>... expStates) throws Exception {
+        for(DistributedDataStore datastore: datastores) {
+            for(String shard: shards) {
+                verifyVotingStates(datastore, shard, expStates);
+            }
+        }
+    }
+
+    @SafeVarargs
+    private static void verifyVotingStates(DistributedDataStore datastore, String shardName,
+            SimpleEntry<String, Boolean>... expStates) throws Exception {
+        String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
+        Map<String, Boolean> expStateMap = new HashMap<>();
+        for(Entry<String, Boolean> e: expStates) {
+            expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
+                    datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
+        }
+
+        verifyRaftState(datastore, shardName, raftState -> {
+            String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
+                    datastore.getActorContext().getDataStoreName()).toString();
+            assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
+            for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
+            }
+        });
     }
 
     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {