Implement cluster admin RPCs to change member voting states 24/39724/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 20 Apr 2016 15:41:25 +0000 (11:41 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 15 Jun 2016 04:44:57 +0000 (04:44 +0000)
Backported from master: https://git.opendaylight.org/gerrit/#/c/38086/

Added 3 new RPCs for changing voting states:
  change-member-voting-states-for-shard
  change-member-voting-states-for-all-shards
  flip-member-voting-states-for-all-shards

These replace the original ones added in Be that weren't implemented.
They were added as placeholders based on how it was thought it would
work at that time.

New related ShardManager messages were added that are sent by the
ClusterAdminRpcService.

The flip-member-voting-states-for-all-shards RPC is a shortcut that
obtains the current voting states via the GetOnDemandRaftState message
to the RaftActor and inverts them. New fields were added to the
OnDemandRaftState response to return the voting states.

Modified the ShardStats JXM bean to report the new OnDemandRaftState
fields.

Added a check in RaftActorServerConfigurationSupport to ensure that
there's at least 1 voting member otherwise one can end up with an
unusable shard with no ability to elect a leader.

Fixed a couple bugs in Leader and AbstractLeader that were found during
testing. AbstractLeader needs to take into account the follower's voting
state when determining if the leader is isolated.

Change-Id: I58686e3ce94d58de7cf289e55bb717ba46bc1de5
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
17 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java

index a1389e7ad13a3de03b6cee5535eed8513b089aba..4e376fe24685c06c5a00d7fe20ee8f6ebabe0304 100644 (file)
@@ -373,8 +373,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         // Debugging message to retrieve raft stats.
 
         Map<String, String> peerAddresses = new HashMap<>();
-        for(String peerId: context.getPeerIds()) {
-            peerAddresses.put(peerId, context.getPeerAddress(peerId));
+        Map<String, Boolean> peerVotingStates = new HashMap<>();
+        for(PeerInfo info: context.getPeers()) {
+            peerVotingStates.put(info.getId(), info.getVotingState() != VotingState.NON_VOTING);
+            peerAddresses.put(info.getId(), info.getAddress());
         }
 
         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
@@ -392,7 +394,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .snapshotIndex(replicatedLog().getSnapshotIndex())
                 .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
+                .isVoting(context.isVotingMember())
                 .peerAddresses(peerAddresses)
+                .peerVotingStates(peerVotingStates)
                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
index 192082f2b6d254af843e19775df2e9e666c10254..1fe31f49df24ada141a2be0088a21c5dca8c41bb 100644 (file)
@@ -702,6 +702,7 @@ class RaftActorServerConfigurationSupport {
             boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
                     getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
             if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) {
+                LOG.debug("Leader changed to non-voting - trying leadership transfer");
                 raftActor.initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
                     @Override
                     public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
@@ -748,9 +749,7 @@ class RaftActorServerConfigurationSupport {
 
             if(tryToElectLeader) {
                 initiateLocalLeaderElection();
-            } else {
-                updateLocalPeerInfo();
-
+            } else if(updateLocalPeerInfo()) {
                 persistNewServerConfiguration(changeVotingStatusContext);
             }
         }
@@ -759,21 +758,39 @@ class RaftActorServerConfigurationSupport {
             LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
 
             ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
-            updateLocalPeerInfo();
+            if(!updateLocalPeerInfo()) {
+                return;
+            }
 
             raftContext.getActor().tell(new ElectionTimeout(), raftContext.getActor());
 
             currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
         }
 
-        private void updateLocalPeerInfo() {
+        private boolean updateLocalPeerInfo() {
             List<ServerInfo> newServerInfoList = newServerInfoList();
 
+            // Check if new voting state would leave us with no voting members.
+            boolean atLeastOneVoting = false;
+            for(ServerInfo info: newServerInfoList) {
+                if(info.isVoting()) {
+                    atLeastOneVoting = true;
+                    break;
+                }
+            }
+
+            if(!atLeastOneVoting) {
+                operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
+                return false;
+            }
+
             raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
             if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
                 leader.updateMinReplicaCount();
             }
+
+            return true;
         }
 
         private List<ServerInfo> newServerInfoList() {
index 0a58b5d708151cc20c5298ffb84cdd190a9a6eff..5271629df175ca99f0d920a3ba44c9f7e0eb177a 100644 (file)
@@ -800,10 +800,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     protected boolean isLeaderIsolated() {
         int minPresent = getMinIsolatedLeaderPeerCount();
         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            if (followerLogInformation.isFollowerActive()) {
+            final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+            if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
                 --minPresent;
                 if (minPresent == 0) {
-                    break;
+                    return false;
                 }
             }
         }
index c76798f3b276c602c3a0190f8749f3e1aca4de89..e5aeb96498ee69ab44c8a0a624c8f91daff51b65 100644 (file)
@@ -46,7 +46,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
  * set commitIndex = N (§5.3, §5.4).
  */
 public class Leader extends AbstractLeader {
-    private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+    static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
     private final Stopwatch isolatedLeaderCheck;
     private @Nullable LeadershipTransferContext leadershipTransferContext;
 
@@ -155,7 +155,9 @@ public class Leader extends AbstractLeader {
     @Override
     public void close() throws Exception {
         if(leadershipTransferContext != null) {
-            leadershipTransferContext.transferCohort.abortTransfer();
+            LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
+            leadershipTransferContext = null;
+            localLeadershipTransferContext.transferCohort.abortTransfer();
         }
 
         super.close();
index 57f8beb0054118e8a26549c55dac770e8ee10593..0bd85b1e6d19082eb1f86f0c9f272c3bddafec8a 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft.client.messages;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -34,9 +35,11 @@ public class OnDemandRaftState {
     private String votedFor;
     private boolean isSnapshotCaptureInitiated;
     private String customRaftPolicyClassName;
+    private boolean isVoting;
 
     private List<FollowerInfo> followerInfoList = Collections.emptyList();
     private Map<String, String> peerAddresses = Collections.emptyMap();
+    private Map<String, Boolean> peerVotingStates = Collections.emptyMap();
 
     private OnDemandRaftState() {
     }
@@ -109,6 +112,10 @@ public class OnDemandRaftState {
         return isSnapshotCaptureInitiated;
     }
 
+    public boolean isVoting() {
+        return isVoting;
+    }
+
     public List<FollowerInfo> getFollowerInfoList() {
         return followerInfoList;
     }
@@ -117,6 +124,10 @@ public class OnDemandRaftState {
         return peerAddresses;
     }
 
+    public Map<String, Boolean> getPeerVotingStates() {
+        return peerVotingStates;
+    }
+
     public String getCustomRaftPolicyClassName() {
         return customRaftPolicyClassName;
     }
@@ -199,6 +210,11 @@ public class OnDemandRaftState {
             return this;
         }
 
+        public Builder isVoting(boolean isVoting) {
+            stats.isVoting = isVoting;
+            return this;
+        }
+
         public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
             stats.followerInfoList = followerInfoList;
             return this;
@@ -209,6 +225,11 @@ public class OnDemandRaftState {
             return this;
         }
 
+        public Builder peerVotingStates(Map<String, Boolean> peerVotingStates) {
+            stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
+            return this;
+        }
+
         public Builder isSnapshotCaptureInitiated(boolean value) {
             stats.isSnapshotCaptureInitiated = value;
             return this;
index 8f6a370a922c9964041788e12b318245c34c8919..ca6f34c5c446e499157aac9cc9451e8e6bf1b6cf 100644 (file)
@@ -48,4 +48,9 @@ public enum ServerChangeStatus {
      * An unsupported request, for example removing the leader in a single node cluster.
      */
     NOT_SUPPORTED,
+
+    /**
+     * Some part of the request is invalid.
+     */
+    INVALID_REQUEST,
 }
index a12878cf23c8ef6a6e1db659a1322540c944f4e0..073249fed2d19f7f395d1a5ca1cc43d290dc32af 100644 (file)
@@ -1082,6 +1082,21 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         LOG.info("testChangeLeaderToNonVoting ending");
     }
 
+    @Test
+    public void testChangeLeaderToNonVotingInSingleNode() {
+        LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(), new MockRaftActorContext()).
+                        withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
+
+        leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
+        ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+        assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
+
+        LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
+    }
+
     @Test
     public void testChangeToVotingWithNoLeader() {
         LOG.info("testChangeToVotingWithNoLeader starting");
index 26e5b7bffa320bfae8f098b55bd2c24061c3e127..50fd2fe358303a1a8f3863548b98eefe1c6b2529 100644 (file)
@@ -1816,6 +1816,25 @@ public class LeaderTest extends AbstractLeaderTest {
         Assert.assertTrue(behavior instanceof Leader);
     }
 
+    @Test
+    public void testIsolatedLeaderCheckNoVotingFollowers() {
+        logStart("testIsolatedLeaderCheckNoVotingFollowers");
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
+
+        leader = new Leader(leaderActorContext);
+        leader.getFollower(FOLLOWER_ID).markFollowerActive();
+        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Expected Leader", behavior instanceof Leader);
+    }
+
     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
index 8db762387b3c382d1e5bcb0ec212a26c1b3bc612..b8e5cd6c2fadd1a6477e167909029910ffde88bb 100644 (file)
@@ -33,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Sets;
@@ -47,11 +48,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
@@ -66,10 +67,12 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -89,12 +92,16 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -235,9 +242,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
-        } else if(message instanceof PrimaryShardFoundForContext) {
-            PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
-            onPrimaryShardFoundContext(primaryShardFoundContext);
         } else if(message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
         } else if(message instanceof WrappedShardResponse){
@@ -246,6 +250,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
+        } else if(message instanceof ChangeShardMembersVotingStatus){
+            onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+        } else if(message instanceof FlipShardMembersVotingStatus){
+            onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
         } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
         } else if(message instanceof SaveSnapshotFailure) {
@@ -253,6 +261,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
         } else if(message instanceof Shutdown) {
             onShutDown();
+        } else if(message instanceof RunnableMessage) {
+            ((RunnableMessage)message).run();
         } else {
             unknownMessage(message);
         }
@@ -327,16 +337,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
-        if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
-            addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
-                    getSender());
-        } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
-            removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
-                    primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
-        }
-    }
-
     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
             final ActorRef sender) {
         if(isShardReplicaOperationInProgress(shardName, sender)) {
@@ -362,6 +362,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
+                    shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                             primaryPath, shardName);
 
@@ -1073,8 +1074,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+                getSelf().tell(new RunnableMessage() {
+                    @Override
+                    public void run() {
+                        addShard(getShardName(), response, getSender());
+                    }
+                }, getTargetActor());
             }
 
             @Override
@@ -1223,12 +1229,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell(new RunnableMessage() {
+                    @Override
+                    public void run() {
+                        removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender());
+                    }
+                }, getTargetActor());
             }
         });
     }
@@ -1279,6 +1294,153 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
     }
 
+    private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+        LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
+
+        final String shardName = changeMembersVotingStatus.getShardName();
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+            serverVotingStatusMap.put(getShardIdentifier(e.getKey(), shardName).toString(), e.getValue());
+        }
+
+        final ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
+
+        findLocalShard(shardName, getSender(), new Predicate<LocalShardFound>() {
+            @Override
+            public boolean apply(LocalShardFound localShardFound) {
+                changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+                        localShardFound.getPath(), getSender());
+                return true;
+            }
+        });
+    }
+
+    private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+        LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
+
+        final ActorRef sender = getSender();
+        final String shardName = flipMembersVotingStatus.getShardName();
+        findLocalShard(shardName, sender, new Predicate<LocalShardFound>() {
+            @Override
+            public boolean apply(final LocalShardFound localShardFound) {
+                Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+                        Timeout.apply(30, TimeUnit.SECONDS));
+
+                future.onComplete(new OnComplete<Object>() {
+                    @Override
+                    public void onComplete(Throwable failure, Object response) {
+                        if (failure != null) {
+                            sender.tell(new Status.Failure(new RuntimeException(
+                                    String.format("Failed to access local shard %s", shardName), failure)), self());
+                            return;
+                        }
+
+                        OnDemandRaftState raftState = (OnDemandRaftState) response;
+                        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+                        for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                            serverVotingStatusMap.put(e.getKey(), !e.getValue());
+                        }
+
+                        serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
+                                toString(), !raftState.isVoting());
+
+                        changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+                                shardName, localShardFound.getPath(), sender);
+                    }
+                }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+                return true;
+            }
+        });
+
+    }
+
+    private void findLocalShard(final String shardName, final ActorRef sender,
+            final Predicate<LocalShardFound> onLocalShardFound) {
+        Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
+
+        Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, final Object response) {
+                if (failure != null) {
+                    LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(
+                            String.format("Failed to find local shard %s", shardName), failure)), self());
+                } else {
+                    if(response instanceof LocalShardFound) {
+                        getSelf().tell(new RunnableMessage() {
+                            @Override
+                            public void run() {
+                                onLocalShardFound.apply((LocalShardFound) response);
+                            }
+                        }, sender);
+                    } else if(response instanceof LocalShardNotFound) {
+                        String msg = String.format("Local shard %s does not exist", shardName);
+                        LOG.debug ("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+                    } else {
+                        String msg = String.format("Failed to find local shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug ("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+                                new RuntimeException(msg)), self());
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+            final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+        LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+                changeServersVotingStatus, shardActorRef.path());
+
+        Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+        Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                shardReplicaOperationsInProgress.remove(shardName);
+                if (failure != null) {
+                    String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+                            shardActorRef.path());
+                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+                    ServerChangeReply replyMsg = (ServerChangeReply) response;
+                    if(replyMsg.getStatus() == ServerChangeStatus.OK) {
+                        LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+                        sender.tell(new Status.Success(null), getSelf());
+                    } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+                        sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+                                "The requested voting state change for shard %s is invalid. At least one member must be voting",
+                                shardId.getShardName()))), getSelf());
+                    } else {
+                        LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+                                persistenceId(), shardName, replyMsg.getStatus());
+
+                        Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+                                replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+                        sender.tell(new Status.Failure(error), getSelf());
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     private static class ForwardedAddServerReply {
         ShardInformation shardInfo;
         AddServerReply addServerReply;
@@ -1710,6 +1872,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    private static interface RunnableMessage extends Runnable {
+    }
+
     /**
      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
      * a remote or local find primary message is processed
@@ -1791,53 +1956,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-
-    /**
-     * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
-     * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
-     * as a successful response to find primary.
-     */
-    private static class PrimaryShardFoundForContext {
-        private final String shardName;
-        private final Object contextMessage;
-        private final RemotePrimaryShardFound remotePrimaryShardFound;
-        private final LocalPrimaryShardFound localPrimaryShardFound;
-
-        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
-                @Nonnull Object primaryFoundMessage) {
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.contextMessage = Preconditions.checkNotNull(contextMessage);
-            Preconditions.checkNotNull(primaryFoundMessage);
-            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
-                    (RemotePrimaryShardFound) primaryFoundMessage : null;
-            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
-                    (LocalPrimaryShardFound) primaryFoundMessage : null;
-        }
-
-        @Nonnull
-        String getPrimaryPath(){
-            if(remotePrimaryShardFound != null) {
-                return remotePrimaryShardFound.getPrimaryPath();
-            }
-            return localPrimaryShardFound.getPrimaryPath();
-        }
-
-        @Nonnull
-        Object getContextMessage() {
-            return contextMessage;
-        }
-
-        @Nullable
-        RemotePrimaryShardFound getRemotePrimaryShardFound() {
-            return remotePrimaryShardFound;
-        }
-
-        @Nonnull
-        String getShardName() {
-            return shardName;
-        }
-    }
-
     /**
      * The WrappedShardResponse class wraps a response from a Shard.
      */
index a67d3de96402827d81557c5cc4f9a4056255e4c7..f0a65e62e7d8279379cc0a16f59045d4c0a80d7f 100644 (file)
@@ -22,7 +22,9 @@ import com.google.common.util.concurrent.SettableFuture;
 import java.io.FileOutputStream;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -30,8 +32,10 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
@@ -41,14 +45,19 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
@@ -210,22 +219,101 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
             public RemoveAllShardReplicasOutput apply(List<ShardResult> shardResults) {
                 return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build();
             }
-        }, "Failed to add replica");
+        }, "Failed to remove replica");
+    }
+
+    @Override
+    public Future<RpcResult<Void>> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) {
+        final String shardName = input.getShardName();
+        if(Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultFuture("A valid shard name must be specified");
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if(dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        if(memberVotingStates == null || memberVotingStates.isEmpty()) {
+            return newFailedRpcResultFuture("No member voting state input was specified");
+        }
+
+        ChangeShardMembersVotingStatus changeVotingStatus = toChangeShardMembersVotingStatus(shardName,
+                memberVotingStates);
+
+        LOG.info("Change member voting states for shard {}: {}", shardName,
+                changeVotingStatus.getMeberVotingStatusMap());
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, changeVotingStatus);
+        Futures.addCallback(future, new FutureCallback<Success>() {
+            @Override
+            public void onSuccess(Success success) {
+                LOG.info("Successfully changed member voting states for shard {}", shardName);
+                returnFuture.set(newSuccessfulResult());
+            }
+
+            @Override
+            public void onFailure(Throwable failure) {
+                onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
     }
 
     @Override
-    public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
+            final ChangeMemberVotingStatesForAllShardsInput input) {
+        final List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        if(memberVotingStates == null || memberVotingStates.isEmpty()) {
+            return newFailedRpcResultFuture("No member voting state input was specified");
+        }
+
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = new Function<String, Object>() {
+            @Override
+            public Object apply(String shardName) {
+                return toChangeShardMembersVotingStatus(shardName, memberVotingStates);
+            }
+        };
+
+        LOG.info("Change member voting states for all shards");
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, new Function<List<ShardResult>, ChangeMemberVotingStatesForAllShardsOutput>() {
+            @Override
+            public ChangeMemberVotingStatesForAllShardsOutput apply(List<ShardResult> shardResults) {
+                return new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build();
+            }
+        }, "Failed to change member voting states");
     }
 
     @Override
-    public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
-            ConvertMembersToNonvotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards() {
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = new Function<String, Object>() {
+            @Override
+            public Object apply(String shardName) {
+                return new FlipShardMembersVotingStatus(shardName);
+            }
+        };
+
+        LOG.info("Flip member voting states for all shards");
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, new Function<List<ShardResult>, FlipMemberVotingStatesForAllShardsOutput>() {
+            @Override
+            public FlipMemberVotingStatesForAllShardsOutput apply(List<ShardResult> shardResults) {
+                return new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build();
+            }
+        }, "Failed to change member voting states");
     }
 
     @Override
@@ -253,6 +341,18 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
+    private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
+            List<MemberVotingState> memberVotingStatus) {
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for(MemberVotingState memberStatus: memberVotingStatus) {
+            serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting());
+        }
+
+        ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName,
+                serverVotingStatusMap);
+        return changeVotingStatus;
+    }
+
     private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
             final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
             final Function<List<ShardResult>, T> resultDataSupplier,
index 0f491570a31eba2f1f0c059f46bc994f5c3d0cbc..b01eb099a5707975515c00d7c661725ff8f16400 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Joiner;
 import com.google.common.base.Stopwatch;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -194,6 +195,15 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
     public String getVotedFor() {
         return getOnDemandRaftState().getVotedFor();
     }
+    @Override
+    public boolean isVoting() {
+        return getOnDemandRaftState().isVoting();
+    }
+
+    @Override
+    public String getPeerVotingStates() {
+        return toStringMap(getOnDemandRaftState().getPeerVotingStates());
+    }
 
     @Override
     public boolean isSnapshotCaptureInitiated() {
@@ -302,17 +312,11 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
 
     @Override
     public String getPeerAddresses() {
-        StringBuilder builder = new StringBuilder();
-        int i = 0;
-        for(Map.Entry<String, String> e: getOnDemandRaftState().getPeerAddresses().entrySet()) {
-            if(i++ > 0) {
-                builder.append(", ");
-            }
-
-            builder.append(e.getKey()).append(": ").append(e.getValue());
-        }
+        return toStringMap(getOnDemandRaftState().getPeerAddresses());
+    }
 
-        return builder.toString();
+    private static String toStringMap(Map<?, ?> map) {
+        return Joiner.on(", ").withKeyValueSeparator(": ").join(map);
     }
 
     @Override
@@ -347,6 +351,7 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         return shard.getPendingTxCommitQueueSize();
     }
 
+    @Override
     public int getTxCohortCacheSize() {
         return shard.getCohortCacheSize();
     }
@@ -357,5 +362,4 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
             shard.getSelf().tell(new InitiateCaptureSnapshot(), ActorRef.noSender());
         }
     }
-
 }
index 39cc22fc2a2b63e64b65ea856298bdcab86ce53f..1ae58e2fe854142915e4ae678a6d714454229565 100644 (file)
@@ -66,6 +66,8 @@ public interface ShardStatsMXBean {
 
    boolean isSnapshotCaptureInitiated();
 
+   boolean isVoting();
+
    void resetTransactionCounters();
 
    long getInMemoryJournalDataSize();
@@ -78,6 +80,8 @@ public interface ShardStatsMXBean {
 
    String getPeerAddresses();
 
+   String getPeerVotingStates();
+
    long getLeadershipChangeCount();
 
    String getLastLeadershipChangeTime();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java
new file mode 100644 (file)
index 0000000..0846f7b
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+
+/**
+ * A local message sent to the ShardManager to change the raft voting status for members of a shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class ChangeShardMembersVotingStatus {
+    private final String shardName;
+    private final Map<String, Boolean> meberVotingStatusMap;
+
+    public ChangeShardMembersVotingStatus(String shardName, Map<String, Boolean> meberVotingStatusMap) {
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.meberVotingStatusMap = ImmutableMap.copyOf(meberVotingStatusMap);
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+
+    public Map<String, Boolean> getMeberVotingStatusMap() {
+        return meberVotingStatusMap;
+    }
+
+    @Override
+    public String toString() {
+        return "ChangeShardMembersVotingStatus [shardName=" + shardName + ", meberVotingStatusMap="
+                + meberVotingStatusMap + "]";
+    }
+
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java
new file mode 100644 (file)
index 0000000..24859a4
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A local message sent to the ShardManager to flip the raft voting states for members of a shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class FlipShardMembersVotingStatus {
+    private final String shardName;
+
+    public FlipShardMembersVotingStatus(String shardName) {
+        this.shardName = Preconditions.checkNotNull(shardName);
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+
+    @Override
+    public String toString() {
+        return "FlipShardMembersVotingStatus [shardName=" + shardName + "]";
+    }
+}
index 2d81db7117421ded2c4b5d662a74570c33063773..40fb5fbe7118fc18dd7a5c03889fbc31ca48c45f 100644 (file)
@@ -47,7 +47,21 @@ module cluster-admin {
             description "The list of results, one per shard";
         }
     }
-    
+
+    grouping member-voting-states-input {
+        list member-voting-state {
+            leaf member-name {
+                type string;
+            }
+
+            leaf voting {
+                type boolean;
+            }
+
+            description "The list of member voting states";
+        }
+    }
+
     rpc add-shard-replica {
         input {
             leaf shard-name {
@@ -112,40 +126,62 @@ module cluster-admin {
                 description "The cluster member from which the shard replicas should be removed";
             }
         }
-        
+
         output {
             uses shard-result-output;
         }
-        
+
         description "Removes replicas for all shards on this node. This is equivalent to issuing
             a remove-shard-replica for all shards and essentially removes this node from a cluster.";
     }
 
-    rpc convert-members-to-nonvoting-for-all-shards {
+    rpc change-member-voting-states-for-shard {
         input {
-            leaf-list member-names {
+            leaf shard-name {
+                mandatory true;
                 type string;
-                description "The names of the cluster members to convert.";
+                description "The name of the shard for which to change voting state.";
             }
+
+            leaf data-store-type {
+                mandatory true;
+                type data-store-type;
+                description "The type of the data store to which the shard belongs";
+            }
+
+            uses member-voting-states-input;
         }
-        
-        description "Converts the given cluster members to non-voting for all shards. The members will no 
-            longer participate in leader elections and consensus but will be replicated. This is useful for
-            having a set of members serve as a backup cluster in case the primary voting cluster suffers
-            catastrophic failure. This RPC can be issued to any cluster member and will be forwarded
-            to the leader.";
+
+        description "Changes the voting states, either voting or non-voting, of cluster members for a shard.
+            Non-voting members will no longer participate in leader elections and consensus but will be
+            replicated. This is useful for having a set of members serve as a backup cluster in case the
+            primary voting cluster suffers catastrophic failure. This RPC can be issued to any cluster member
+            and will be forwarded to the leader.";
     }
 
-    rpc convert-members-to-voting-for-all-shards {
+    rpc change-member-voting-states-for-all-shards {
         input {
-            leaf-list member-names {
-                type string;
-                description "The names of the cluster members to convert.";
-            }
+            uses member-voting-states-input;
+        }
+
+        output {
+            uses shard-result-output;
+        }
+
+        description "Changes the voting states, either voting or non-voting, of cluster members for all shards.
+            Non-voting members will no longer participate in leader elections and consensus but will be
+            replicated. This is useful for having a set of members serve as a backup cluster in case the
+            primary voting cluster suffers catastrophic failure. This RPC can be issued to any cluster member
+            and will be forwarded to the leader.";
+    }
+
+    rpc flip-member-voting-states-for-all-shards {
+        output {
+            uses shard-result-output;
         }
 
-        description "Converts the given cluster members to voting for all shards. The members will 
-            participate in leader elections and consensus.";
+        description "Flips the voting states of all cluster members for all shards, such that if a member
+            was voting it becomes non-voting and vice versa.";
     }
 
     rpc backup-datastore {
index cac827745513d8ce5fe48af967310fc713478271..c22810951da5ae6b9b19eb529072fe0b345a3a62 100644 (file)
@@ -77,6 +77,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
@@ -108,8 +109,10 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -1434,9 +1437,11 @@ public class ShardManagerTest extends AbstractActorTest {
         final ActorSystem system2 = newActorSystem("Member2");
         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
 
+        final String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
         final TestActorRef<MockRespondActor> mockShardLeaderActor =
-                TestActorRef.create(system2, Props.create(MockRespondActor.class).
+                TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class,
+                        new AddServerReply(ServerChangeStatus.OK, memberId2)).
                         withDispatcher(Dispatchers.DefaultDispatcherId()), name);
         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
@@ -1450,7 +1455,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
 
-            String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+
             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
@@ -1470,8 +1475,6 @@ public class ShardManagerTest extends AbstractActorTest {
             InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
 
             //construct a mock response message
-            AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
-            mockShardLeaderActor.underlyingActor().updateResponse(response);
             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
                 AddServer.class);
@@ -1504,7 +1507,7 @@ public class ShardManagerTest extends AbstractActorTest {
             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
-                    Props.create(MockRespondActor.class, addServerReply), leaderId);
+                    Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
 
             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
 
@@ -1667,7 +1670,8 @@ public class ShardManagerTest extends AbstractActorTest {
             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
 
             final TestActorRef<MockRespondActor> respondActor =
-                    TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
+                    actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
+                            new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
 
             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
 
@@ -1678,7 +1682,6 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
                     RaftState.Leader.name())), respondActor);
 
-            respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
             shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef());
             final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
             assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(),
@@ -1711,8 +1714,10 @@ public class ShardManagerTest extends AbstractActorTest {
         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
 
         String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString();
+        final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
         final TestActorRef<MockRespondActor> mockShardLeaderActor =
-                TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+                TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
+                        new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
 
         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
 
@@ -1746,7 +1751,6 @@ public class ShardManagerTest extends AbstractActorTest {
             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
             newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
 
-            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
@@ -1763,8 +1767,6 @@ public class ShardManagerTest extends AbstractActorTest {
             leaderShardManager.underlyingActor().waitForMemberUp();
 
             //construct a mock response message
-            RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
-            mockShardLeaderActor.underlyingActor().updateResponse(response);
             newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef());
             RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
                     RemoveServer.class);
@@ -1972,6 +1974,61 @@ public class ShardManagerTest extends AbstractActorTest {
         LOG.info("testShutDown ending");
     }
 
+    @Test
+    public void testChangeServersVotingStatus() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+            TestActorRef<MockRespondActor> respondActor =
+                    actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+                            new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
+
+            ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), respondActor);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+                    DataStoreVersions.CURRENT_VERSION), getRef());
+            shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+                    RaftState.Leader.name())), respondActor);
+
+            shardManager.tell(new ChangeShardMembersVotingStatus("default",
+                    ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+            ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching(
+                    respondActor, ChangeServersVotingStatus.class);
+            assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
+                    ImmutableMap.of(new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString(), Boolean.TRUE));
+
+            expectMsgClass(duration("5 seconds"), Success.class);
+        }};
+    }
+
+    @Test
+    public void testChangeServersVotingStatusWithNoLeader() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+            TestActorRef<MockRespondActor> respondActor =
+                    actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+                            new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
+
+            ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), respondActor);
+            shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor);
+
+            shardManager.tell(new ChangeShardMembersVotingStatus("default",
+                    ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+            MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
+
+            Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
+            assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException));
+        }};
+    }
+
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
@@ -2185,35 +2242,25 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static class MockRespondActor extends MessageCollectorActor {
         static final String CLEAR_RESPONSE = "clear-response";
-        static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
 
-        private volatile Object responseMsg;
+        private Object responseMsg;
+        private final Class<?> requestClass;
 
         @SuppressWarnings("unused")
-        public MockRespondActor() {
-        }
-
-        @SuppressWarnings("unused")
-        public MockRespondActor(Object responseMsg) {
+        public MockRespondActor(Class<?> requestClass, Object responseMsg) {
+            this.requestClass = requestClass;
             this.responseMsg = responseMsg;
         }
 
-        public void updateResponse(Object response) {
-            responseMsg = response;
-        }
-
         @Override
         public void onReceive(Object message) throws Exception {
-            if(!"get-all-messages".equals(message)) {
-                LOG.debug("Received message : {}", message);
-            }
-            super.onReceive(message);
-            if (message instanceof AddServer && responseMsg != null) {
-                getSender().tell(responseMsg, getSelf());
-            } else if(message instanceof RemoveServer && responseMsg != null){
-                getSender().tell(responseMsg, getSelf());
-            } else if(message.equals(CLEAR_RESPONSE)) {
+            if(message.equals(CLEAR_RESPONSE)) {
                 responseMsg = null;
+            } else {
+                super.onReceive(message);
+                if (message.getClass().equals(requestClass) && responseMsg != null) {
+                    getSender().tell(responseMsg, getSelf());
+                }
             }
         }
     }
index 9ac5c7af61d635a71cf040094efb3605760e0cc0..2855e1e5592c113d1f4c3e368a72465236f2173b 100644 (file)
@@ -13,6 +13,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
@@ -22,18 +23,21 @@ import akka.actor.PoisonPill;
 import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileInputStream;
 import java.net.URI;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
@@ -46,8 +50,14 @@ import org.opendaylight.controller.cluster.datastore.MemberNode;
 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -57,10 +67,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -129,8 +144,6 @@ public class ClusterAdminRpcServiceTest {
                 get(5, TimeUnit.SECONDS);
         assertEquals("isSuccessful", false, rpcResult.isSuccessful());
         assertEquals("getErrors", 1, rpcResult.getErrors().size());
-
-        service.close();
     }
 
     private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
@@ -227,8 +240,6 @@ public class ClusterAdminRpcServiceTest {
         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
         verifyFailedRpcResult(rpcResult);
-
-        service.close();
     }
 
     private NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
@@ -276,8 +287,6 @@ public class ClusterAdminRpcServiceTest {
         verifySuccessfulRpcResult(rpcResult);
 
         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
-
-        service.close();
     }
 
     private <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
@@ -329,7 +338,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service3.close();
 
         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
@@ -354,7 +362,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service1.close();
 
         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
         verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
@@ -392,7 +399,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service1.close();
 
         verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
             @Override
@@ -454,8 +460,6 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
-
-        service.close();
     }
 
     @Test
@@ -516,18 +520,403 @@ public class ClusterAdminRpcServiceTest {
         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
+    }
+
+    @Test
+    public void testChangeMemberVotingStatesForShard() throws Exception {
+        String name = "testChangeMemberVotingStatusForShard";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<Void> rpcResult = service3.changeMemberVotingStatesForShard(
+                new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+                    setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+                        new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        verifySuccessfulRpcResult(rpcResult);
 
-        service3.close();
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
+        verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
+        verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
     }
 
     @Test
-    public void testConvertMembersToVotingForAllShards() {
-        // TODO implement
+    public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
+        String name = "testChangeMemberVotingStatesForSingleNodeShard";
+        String moduleShardsConfig = "module-shards-member1.conf";
+        MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        leaderNode.configDataStore().waitTillReady();
+
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
+                leaderNode.operDataStore());
+
+        RpcResult<Void> rpcResult = service.changeMemberVotingStatesForShard(
+                new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+                    setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-1").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        verifyFailedRpcResult(rpcResult);
+
+        verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", true));
     }
 
     @Test
-    public void testConvertMembersToNonvotingForAllShards() {
-        // TODO implement
+    public void testChangeMemberVotingStatesForAllShards() throws Exception {
+        String name = "testChangeMemberVotingStatesForAllShards";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
+                new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+                        new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
+    }
+
+    @Test
+    public void testFlipMemberVotingStates() throws Exception {
+        String name = "testFlipMemberVotingStates";
+
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+                new ServerInfo("member-3", false)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+                new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", false));
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", true));
+
+        // Leadership should have transferred to member 3 since it is the only remaining voting member.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertNotNull("Expected non-null leader Id", raftState.getLeader());
+                assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                        raftState.getLeader().contains("member-3"));
+            }
+        });
+
+        verifyRaftState(leaderNode1.operDataStore(), "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertNotNull("Expected non-null leader Id", raftState.getLeader());
+                assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                        raftState.getLeader().contains("member-3"));
+            }
+        });
+
+        // Flip the voting states back to the original states.
+
+        rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
+        result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+                new SimpleEntry<>("member-3", false));
+
+        // Leadership should have transferred to member 1 or 2.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertNotNull("Expected non-null leader Id", raftState.getLeader());
+                assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
+                        raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
+            }
+        });
+    }
+
+    @Test
+    public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
+        String name = "testFlipMemberVotingStatesWithNoInitialLeader";
+
+        // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
+        // non-voting and simulated as down by not starting them up.
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", false), new ServerInfo("member-2", false),
+                new ServerInfo("member-3", false), new ServerInfo("member-4", true),
+                new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        // Initially there won't be a leader b/c all the up nodes are non-voting.
+
+        replicaNode1.waitForMembersUp("member-2", "member-3");
+
+        verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", false),
+                new SimpleEntry<>("member-2", false), new SimpleEntry<>("member-3", false),
+                new SimpleEntry<>("member-4", true), new SimpleEntry<>("member-5", true),
+                new SimpleEntry<>("member-6", true));
+
+        verifyRaftState(replicaNode1.configDataStore(), "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState());
+            }
+        });
+
+        ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
+                replicaNode1.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+                new SimpleEntry<>("member-3", true), new SimpleEntry<>("member-4", false),
+                new SimpleEntry<>("member-5", false), new SimpleEntry<>("member-6", false));
+
+        // Since member 1 was changed to voting and there was no leader, it should've started and election
+        // and become leader
+        verifyRaftState(replicaNode1.configDataStore(), "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertNotNull("Expected non-null leader Id", raftState.getLeader());
+                assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                        raftState.getLeader().contains("member-1"));
+            }
+        });
+
+        verifyRaftState(replicaNode1.operDataStore(), "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertNotNull("Expected non-null leader Id", raftState.getLeader());
+                assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                        raftState.getLeader().contains("member-1"));
+            }
+        });
+    }
+
+    @Test
+    public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
+        String name = "testFlipMemberVotingStatesWithVotingMembersDown";
+
+        // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+                new ServerInfo("member-3", true), new ServerInfo("member-4", false),
+                new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+                new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", true),
+                new SimpleEntry<>("member-4", false), new SimpleEntry<>("member-5", false),
+                new SimpleEntry<>("member-6", false));
+
+        ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+                leaderNode1.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false), new SimpleEntry<>("member-4", true),
+                new SimpleEntry<>("member-5", true), new SimpleEntry<>("member-6", true));
+
+        // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
+        // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertNotNull("Expected non-null leader Id", raftState.getLeader());
+                assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
+            }
+        });
+    }
+
+    private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
+            String member, String datastoreTypeSuffix, String... shards) {
+        String[] datastoreTypes = {"config_", "oper_"};
+        for(String type: datastoreTypes) {
+            for(String shard: shards) {
+                List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
+                for(ServerInfo info: serverConfig.getServerConfig()) {
+                    newServerInfo.add(new ServerInfo(new ShardIdentifier(shard, info.getId(),
+                            type + datastoreTypeSuffix).toString(), info.isVoting()));
+                }
+
+                String shardID = new ShardIdentifier(shard, member, type + datastoreTypeSuffix).toString();
+                InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
+                InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
+                        new ServerConfigurationPayload(newServerInfo)));
+            }
+        }
+    }
+
+    @SafeVarargs
+    private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards,
+            SimpleEntry<String, Boolean>... expStates) throws Exception {
+        for(DistributedDataStore datastore: datastores) {
+            for(String shard: shards) {
+                verifyVotingStates(datastore, shard, expStates);
+            }
+        }
+    }
+
+    @SafeVarargs
+    private static void verifyVotingStates(final DistributedDataStore datastore, final String shardName,
+            SimpleEntry<String, Boolean>... expStates) throws Exception {
+        final String localMemberName = datastore.getActorContext().getCurrentMemberName();
+        final Map<String, Boolean> expStateMap = new HashMap<>();
+        for(Entry<String, Boolean> e: expStates) {
+            expStateMap.put(new ShardIdentifier(shardName, e.getKey(),
+                    datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
+        }
+
+        verifyRaftState(datastore, shardName, new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                String localPeerId = new ShardIdentifier(shardName, localMemberName,
+                        datastore.getActorContext().getDataStoreName()).toString();
+                assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
+                for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                    assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
+                }
+            }
+        });
     }
 
     private void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {