Implement cluster admin RPCs to change member voting states 86/38086/6
authorTom Pantelis <tpanteli@brocade.com>
Wed, 20 Apr 2016 15:41:25 +0000 (11:41 -0400)
committerAnil Vishnoi <vishnoianil@gmail.com>
Wed, 1 Jun 2016 16:08:04 +0000 (16:08 +0000)
Added 3 new RPCs for changing voting states:
  change-member-voting-states-for-shard
  change-member-voting-states-for-all-shards
  flip-member-voting-states-for-all-shards

These replace the original ones added in Be that weren't implemented.
They were added as placeholders based on how it was thought it would
work at that time.

New related ShardManager messages were added that are sent by the
ClusterAdminRpcService.

The flip-member-voting-states-for-all-shards RPC is a shortcut that
obtains the current voting states via the GetOnDemandRaftState message
to the RaftActor and inverts them. New fields were added to the
OnDemandRaftState response to return the voting states.

Modified the ShardStats JXM bean to report the new OnDemandRaftState
fields.

Added a check in RaftActorServerConfigurationSupport to ensure that
there's at least 1 voting member otherwise one can end up with an
unusable shard with no ability to elect a leader.

Fixed a couple bugs in Leader and AbstractLeader that were found during
testing. AbstractLeader needs to take into account the follower's voting
state when determining if the leader is isolated.

Change-Id: I58686e3ce94d58de7cf289e55bb717ba46bc1de5
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
18 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-cluster-admin/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-cluster-admin/src/main/yang/cluster-admin.yang
opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java
opendaylight/md-sal/sal-cluster-admin/src/test/resources/simplelogger.properties [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java

index 350fa54..b207e0b 100644 (file)
@@ -409,8 +409,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         // Debugging message to retrieve raft stats.
 
         Map<String, String> peerAddresses = new HashMap<>();
-        for(String peerId: context.getPeerIds()) {
-            peerAddresses.put(peerId, context.getPeerAddress(peerId));
+        Map<String, Boolean> peerVotingStates = new HashMap<>();
+        for(PeerInfo info: context.getPeers()) {
+            peerVotingStates.put(info.getId(), info.getVotingState() != VotingState.NON_VOTING);
+            peerAddresses.put(info.getId(), info.getAddress());
         }
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
@@ -429,7 +431,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .snapshotIndex(replicatedLog().getSnapshotIndex())
                 .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
+                .isVoting(context.isVotingMember())
                 .peerAddresses(peerAddresses)
+                .peerVotingStates(peerVotingStates)
                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
         ReplicatedLogEntry lastLogEntry = replicatedLog().last();
index 4cad0d0..f76c7d7 100644 (file)
@@ -681,7 +681,8 @@ class RaftActorServerConfigurationSupport {
             boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
                     getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
             if (succeeded && localServerChangedToNonVoting) {
-               raftActor.becomeNonVoting();
+                LOG.debug("Leader changed to non-voting - trying leadership transfer");
+                raftActor.becomeNonVoting();
             }
         }
 
@@ -707,9 +708,7 @@ class RaftActorServerConfigurationSupport {
 
             if(tryToElectLeader) {
                 initiateLocalLeaderElection();
-            } else {
-                updateLocalPeerInfo();
-
+            } else if(updateLocalPeerInfo()) {
                 persistNewServerConfiguration(changeVotingStatusContext);
             }
         }
@@ -718,21 +717,39 @@ class RaftActorServerConfigurationSupport {
             LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
 
             ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
-            updateLocalPeerInfo();
+            if(!updateLocalPeerInfo()) {
+                return;
+            }
 
             raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
 
             currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
         }
 
-        private void updateLocalPeerInfo() {
+        private boolean updateLocalPeerInfo() {
             List<ServerInfo> newServerInfoList = newServerInfoList();
 
+            // Check if new voting state would leave us with no voting members.
+            boolean atLeastOneVoting = false;
+            for(ServerInfo info: newServerInfoList) {
+                if(info.isVoting()) {
+                    atLeastOneVoting = true;
+                    break;
+                }
+            }
+
+            if(!atLeastOneVoting) {
+                operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
+                return false;
+            }
+
             raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
             if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
                 leader.updateMinReplicaCount();
             }
+
+            return true;
         }
 
         private List<ServerInfo> newServerInfoList() {
index 6779414..2fe3bfd 100644 (file)
@@ -775,10 +775,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     protected boolean isLeaderIsolated() {
         int minPresent = getMinIsolatedLeaderPeerCount();
         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            if (followerLogInformation.isFollowerActive()) {
+            final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+            if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
                 --minPresent;
                 if (minPresent == 0) {
-                    break;
+                    return false;
                 }
             }
         }
index 36e9b64..77853f3 100644 (file)
@@ -161,7 +161,9 @@ public class Leader extends AbstractLeader {
     @Override
     public void close() {
         if(leadershipTransferContext != null) {
-            leadershipTransferContext.transferCohort.abortTransfer();
+            LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
+            leadershipTransferContext = null;
+            localLeadershipTransferContext.transferCohort.abortTransfer();
         }
 
         super.close();
index 57f8beb..0bd85b1 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft.client.messages;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -34,9 +35,11 @@ public class OnDemandRaftState {
     private String votedFor;
     private boolean isSnapshotCaptureInitiated;
     private String customRaftPolicyClassName;
+    private boolean isVoting;
 
     private List<FollowerInfo> followerInfoList = Collections.emptyList();
     private Map<String, String> peerAddresses = Collections.emptyMap();
+    private Map<String, Boolean> peerVotingStates = Collections.emptyMap();
 
     private OnDemandRaftState() {
     }
@@ -109,6 +112,10 @@ public class OnDemandRaftState {
         return isSnapshotCaptureInitiated;
     }
 
+    public boolean isVoting() {
+        return isVoting;
+    }
+
     public List<FollowerInfo> getFollowerInfoList() {
         return followerInfoList;
     }
@@ -117,6 +124,10 @@ public class OnDemandRaftState {
         return peerAddresses;
     }
 
+    public Map<String, Boolean> getPeerVotingStates() {
+        return peerVotingStates;
+    }
+
     public String getCustomRaftPolicyClassName() {
         return customRaftPolicyClassName;
     }
@@ -199,6 +210,11 @@ public class OnDemandRaftState {
             return this;
         }
 
+        public Builder isVoting(boolean isVoting) {
+            stats.isVoting = isVoting;
+            return this;
+        }
+
         public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
             stats.followerInfoList = followerInfoList;
             return this;
@@ -209,6 +225,11 @@ public class OnDemandRaftState {
             return this;
         }
 
+        public Builder peerVotingStates(Map<String, Boolean> peerVotingStates) {
+            stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
+            return this;
+        }
+
         public Builder isSnapshotCaptureInitiated(boolean value) {
             stats.isSnapshotCaptureInitiated = value;
             return this;
index 8f6a370..ca6f34c 100644 (file)
@@ -48,4 +48,9 @@ public enum ServerChangeStatus {
      * An unsupported request, for example removing the leader in a single node cluster.
      */
     NOT_SUPPORTED,
+
+    /**
+     * Some part of the request is invalid.
+     */
+    INVALID_REQUEST,
 }
index 02ba015..1a43dfe 100644 (file)
@@ -1085,6 +1085,21 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         LOG.info("testChangeLeaderToNonVoting ending");
     }
 
+    @Test
+    public void testChangeLeaderToNonVotingInSingleNode() {
+        LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
+                        withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
+
+        leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
+        ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+        assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
+
+        LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
+    }
+
     @Test
     public void testChangeToVotingWithNoLeader() {
         LOG.info("testChangeToVotingWithNoLeader starting");
index 0937f65..c99c85f 100644 (file)
@@ -1836,6 +1836,25 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertTrue(behavior instanceof Leader);
     }
 
+    @Test
+    public void testIsolatedLeaderCheckNoVotingFollowers() {
+        logStart("testIsolatedLeaderCheckNoVotingFollowers");
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
+
+        leader = new Leader(leaderActorContext);
+        leader.getFollower(FOLLOWER_ID).markFollowerActive();
+        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Expected Leader", behavior instanceof Leader);
+    }
+
     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
index d52a154..617120a 100644 (file)
@@ -22,7 +22,9 @@ import com.google.common.util.concurrent.SettableFuture;
 import java.io.FileOutputStream;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -31,25 +33,30 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
@@ -63,14 +70,13 @@ import org.slf4j.LoggerFactory;
  *
  * @author Thomas Pantelis
  */
-public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
+public class ClusterAdminRpcService implements ClusterAdminService {
     private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
 
     private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
 
     private final DistributedDataStoreInterface configDataStore;
     private final DistributedDataStoreInterface operDataStore;
-    private RpcRegistration<ClusterAdminService> rpcRegistration;
 
     public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
             DistributedDataStoreInterface operDataStore) {
@@ -78,19 +84,6 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         this.operDataStore = operDataStore;
     }
 
-    public void start(RpcProviderRegistry rpcProviderRegistry) {
-        LOG.debug("ClusterAdminRpcService starting");
-
-        rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this);
-    }
-
-    @Override
-    public void close() {
-        if(rpcRegistration != null) {
-            rpcRegistration.close();
-        }
-    }
-
     @Override
     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         final String shardName = input.getShardName();
@@ -168,22 +161,14 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         LOG.info("Adding replicas for all shards");
 
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = new Function<String, Object>() {
-            @Override
-            public Object apply(String shardName) {
-                return new AddShardReplica(shardName);
-            }
-        };
+        Function<String, Object> messageSupplier = shardName -> new AddShardReplica(shardName);
 
         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
 
-        return waitForShardResults(shardResultData, new Function<List<ShardResult>, AddReplicasForAllShardsOutput>() {
-            @Override
-            public AddReplicasForAllShardsOutput apply(List<ShardResult> shardResults) {
-                return new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build();
-            }
-        }, "Failed to add replica");
+        return waitForShardResults(shardResultData, shardResults ->
+                new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to add replica");
     }
 
 
@@ -197,37 +182,95 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         }
 
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = new Function<String, Object>() {
-            @Override
-            public Object apply(String shardName) {
-                return new RemoveShardReplica(shardName, MemberName.forName(memberName));
-            }
-        };
+        Function<String, Object> messageSupplier = shardName ->
+                new RemoveShardReplica(shardName, MemberName.forName(memberName));
 
         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
 
-        return waitForShardResults(shardResultData, new Function<List<ShardResult>, RemoveAllShardReplicasOutput>() {
+        return waitForShardResults(shardResultData, shardResults ->
+                new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(),
+        "       Failed to remove replica");
+    }
+
+    @Override
+    public Future<RpcResult<Void>> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) {
+        final String shardName = input.getShardName();
+        if(Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultFuture("A valid shard name must be specified");
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if(dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        if(memberVotingStates == null || memberVotingStates.isEmpty()) {
+            return newFailedRpcResultFuture("No member voting state input was specified");
+        }
+
+        ChangeShardMembersVotingStatus changeVotingStatus = toChangeShardMembersVotingStatus(shardName,
+                memberVotingStates);
+
+        LOG.info("Change member voting states for shard {}: {}", shardName,
+                changeVotingStatus.getMeberVotingStatusMap());
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, changeVotingStatus);
+        Futures.addCallback(future, new FutureCallback<Success>() {
             @Override
-            public RemoveAllShardReplicasOutput apply(List<ShardResult> shardResults) {
-                return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build();
+            public void onSuccess(Success success) {
+                LOG.info("Successfully changed member voting states for shard {}", shardName);
+                returnFuture.set(newSuccessfulResult());
             }
-        }, "Failed to add replica");
+
+            @Override
+            public void onFailure(Throwable failure) {
+                onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
     }
 
     @Override
-    public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
+            final ChangeMemberVotingStatesForAllShardsInput input) {
+        List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        if(memberVotingStates == null || memberVotingStates.isEmpty()) {
+            return newFailedRpcResultFuture("No member voting state input was specified");
+        }
+
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = shardName ->
+                toChangeShardMembersVotingStatus(shardName, memberVotingStates);
+
+        LOG.info("Change member voting states for all shards");
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, shardResults ->
+                new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to change member voting states");
     }
 
     @Override
-    public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
-            ConvertMembersToNonvotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards() {
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = shardName ->
+                new FlipShardMembersVotingStatus(shardName);
+
+        LOG.info("Flip member voting states for all shards");
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, shardResults ->
+                new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to change member voting states");
     }
 
     @Override
@@ -255,6 +298,18 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
+    private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
+            List<MemberVotingState> memberVotingStatus) {
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for(MemberVotingState memberStatus: memberVotingStatus) {
+            serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting());
+        }
+
+        ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName,
+                serverVotingStatusMap);
+        return changeVotingStatus;
+    }
+
     private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
             final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
             final Function<List<ShardResult>, T> resultDataSupplier,
index 2d81db7..40fb5fb 100644 (file)
@@ -47,7 +47,21 @@ module cluster-admin {
             description "The list of results, one per shard";
         }
     }
-    
+
+    grouping member-voting-states-input {
+        list member-voting-state {
+            leaf member-name {
+                type string;
+            }
+
+            leaf voting {
+                type boolean;
+            }
+
+            description "The list of member voting states";
+        }
+    }
+
     rpc add-shard-replica {
         input {
             leaf shard-name {
@@ -112,40 +126,62 @@ module cluster-admin {
                 description "The cluster member from which the shard replicas should be removed";
             }
         }
-        
+
         output {
             uses shard-result-output;
         }
-        
+
         description "Removes replicas for all shards on this node. This is equivalent to issuing
             a remove-shard-replica for all shards and essentially removes this node from a cluster.";
     }
 
-    rpc convert-members-to-nonvoting-for-all-shards {
+    rpc change-member-voting-states-for-shard {
         input {
-            leaf-list member-names {
+            leaf shard-name {
+                mandatory true;
                 type string;
-                description "The names of the cluster members to convert.";
+                description "The name of the shard for which to change voting state.";
             }
+
+            leaf data-store-type {
+                mandatory true;
+                type data-store-type;
+                description "The type of the data store to which the shard belongs";
+            }
+
+            uses member-voting-states-input;
         }
-        
-        description "Converts the given cluster members to non-voting for all shards. The members will no 
-            longer participate in leader elections and consensus but will be replicated. This is useful for
-            having a set of members serve as a backup cluster in case the primary voting cluster suffers
-            catastrophic failure. This RPC can be issued to any cluster member and will be forwarded
-            to the leader.";
+
+        description "Changes the voting states, either voting or non-voting, of cluster members for a shard.
+            Non-voting members will no longer participate in leader elections and consensus but will be
+            replicated. This is useful for having a set of members serve as a backup cluster in case the
+            primary voting cluster suffers catastrophic failure. This RPC can be issued to any cluster member
+            and will be forwarded to the leader.";
     }
 
-    rpc convert-members-to-voting-for-all-shards {
+    rpc change-member-voting-states-for-all-shards {
         input {
-            leaf-list member-names {
-                type string;
-                description "The names of the cluster members to convert.";
-            }
+            uses member-voting-states-input;
+        }
+
+        output {
+            uses shard-result-output;
+        }
+
+        description "Changes the voting states, either voting or non-voting, of cluster members for all shards.
+            Non-voting members will no longer participate in leader elections and consensus but will be
+            replicated. This is useful for having a set of members serve as a backup cluster in case the
+            primary voting cluster suffers catastrophic failure. This RPC can be issued to any cluster member
+            and will be forwarded to the leader.";
+    }
+
+    rpc flip-member-voting-states-for-all-shards {
+        output {
+            uses shard-result-output;
         }
 
-        description "Converts the given cluster members to voting for all shards. The members will 
-            participate in leader elections and consensus.";
+        description "Flips the voting states of all cluster members for all shards, such that if a member
+            was voting it becomes non-voting and vice versa.";
     }
 
     rpc backup-datastore {
index 1c78622..8ec173d 100644 (file)
@@ -13,6 +13,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
@@ -22,18 +23,21 @@ import akka.actor.PoisonPill;
 import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileInputStream;
 import java.net.URI;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
@@ -47,8 +51,14 @@ import org.opendaylight.controller.cluster.datastore.MemberNode;
 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -58,10 +68,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -133,8 +148,6 @@ public class ClusterAdminRpcServiceTest {
                 get(5, TimeUnit.SECONDS);
         assertEquals("isSuccessful", false, rpcResult.isSuccessful());
         assertEquals("getErrors", 1, rpcResult.getErrors().size());
-
-        service.close();
     }
 
     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
@@ -231,8 +244,6 @@ public class ClusterAdminRpcServiceTest {
         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
         verifyFailedRpcResult(rpcResult);
-
-        service.close();
     }
 
     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
@@ -280,8 +291,6 @@ public class ClusterAdminRpcServiceTest {
         verifySuccessfulRpcResult(rpcResult);
 
         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
-
-        service.close();
     }
 
     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
@@ -333,7 +342,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service3.close();
 
         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
@@ -358,7 +366,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service1.close();
 
         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
         verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
@@ -396,7 +403,6 @@ public class ClusterAdminRpcServiceTest {
                 setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
                         get(10, TimeUnit.SECONDS);
         verifySuccessfulRpcResult(rpcResult);
-        service1.close();
 
         verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
             @Override
@@ -458,8 +464,6 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
-
-        service.close();
     }
 
     @Test
@@ -520,18 +524,379 @@ public class ClusterAdminRpcServiceTest {
         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
+    }
+
+    @Test
+    public void testChangeMemberVotingStatesForShard() throws Exception {
+        String name = "testChangeMemberVotingStatusForShard";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
 
-        service3.close();
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<Void> rpcResult = service3.changeMemberVotingStatesForShard(
+                new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+                    setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+                        new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        verifySuccessfulRpcResult(rpcResult);
+
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
+        verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
+        verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
     }
 
     @Test
-    public void testConvertMembersToVotingForAllShards() {
-        // TODO implement
+    public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
+        String name = "testChangeMemberVotingStatesForSingleNodeShard";
+        String moduleShardsConfig = "module-shards-member1.conf";
+        MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        leaderNode.configDataStore().waitTillReady();
+
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
+                leaderNode.operDataStore());
+
+        RpcResult<Void> rpcResult = service.changeMemberVotingStatesForShard(
+                new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+                    setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-1").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        verifyFailedRpcResult(rpcResult);
+
+        verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", true));
+    }
+
+    @Test
+    public void testChangeMemberVotingStatesForAllShards() throws Exception {
+        String name = "testChangeMemberVotingStatesForAllShards";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+        // Invoke RPC service on member-3 to change voting status
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
+                new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
+                        new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+                        new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+                get(10, TimeUnit.SECONDS);
+        ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false));
     }
 
     @Test
-    public void testConvertMembersToNonvotingForAllShards() {
-        // TODO implement
+    public void testFlipMemberVotingStates() throws Exception {
+        String name = "testFlipMemberVotingStates";
+
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+                new ServerInfo("member-3", false)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+                new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", false));
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", true));
+
+        // Leadership should have transferred to member 3 since it is the only remaining voting member.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-3"));
+        });
+
+        verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-3"));
+        });
+
+        // Flip the voting states back to the original states.
+
+        rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
+        result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+                new SimpleEntry<>("member-3", false));
+
+        // Leadership should have transferred to member 1 or 2.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
+        });
+    }
+
+    @Test
+    public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
+        String name = "testFlipMemberVotingStatesWithNoInitialLeader";
+
+        // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
+        // non-voting and simulated as down by not starting them up.
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", false), new ServerInfo("member-2", false),
+                new ServerInfo("member-3", false), new ServerInfo("member-4", true),
+                new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        // Initially there won't be a leader b/c all the up nodes are non-voting.
+
+        replicaNode1.waitForMembersUp("member-2", "member-3");
+
+        verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", false),
+                new SimpleEntry<>("member-2", false), new SimpleEntry<>("member-3", false),
+                new SimpleEntry<>("member-4", true), new SimpleEntry<>("member-5", true),
+                new SimpleEntry<>("member-6", true));
+
+        verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
+            assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
+
+        ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
+                replicaNode1.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+                new SimpleEntry<>("member-3", true), new SimpleEntry<>("member-4", false),
+                new SimpleEntry<>("member-5", false), new SimpleEntry<>("member-6", false));
+
+        // Since member 1 was changed to voting and there was no leader, it should've started and election
+        // and become leader
+        verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-1"));
+        });
+
+        verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+                    raftState.getLeader().contains("member-1"));
+        });
+    }
+
+    @Test
+    public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
+        String name = "testFlipMemberVotingStatesWithVotingMembersDown";
+
+        // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+                new ServerInfo("member-3", true), new ServerInfo("member-4", false),
+                new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
+
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+        setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        leaderNode1.operDataStore().waitTillReady();
+        verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+                new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", true),
+                new SimpleEntry<>("member-4", false), new SimpleEntry<>("member-5", false),
+                new SimpleEntry<>("member-6", false));
+
+        ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+                leaderNode1.operDataStore());
+
+        RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+                get(10, TimeUnit.SECONDS);
+        FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational));
+
+        // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
+        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+                replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+                replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+                new String[]{"cars", "people"},
+                new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+                new SimpleEntry<>("member-3", false), new SimpleEntry<>("member-4", true),
+                new SimpleEntry<>("member-5", true), new SimpleEntry<>("member-6", true));
+
+        // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
+        // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
+        verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+            assertNotNull("Expected non-null leader Id", raftState.getLeader());
+            assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
+        });
+    }
+
+    private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
+            String member, String datastoreTypeSuffix, String... shards) {
+        String[] datastoreTypes = {"config_", "oper_"};
+        for(String type: datastoreTypes) {
+            for(String shard: shards) {
+                List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
+                for(ServerInfo info: serverConfig.getServerConfig()) {
+                    newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
+                            type + datastoreTypeSuffix).toString(), info.isVoting()));
+                }
+
+                String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
+                        type + datastoreTypeSuffix).toString();
+                InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
+                InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
+                        new ServerConfigurationPayload(newServerInfo)));
+            }
+        }
+    }
+
+    @SafeVarargs
+    private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards,
+            SimpleEntry<String, Boolean>... expStates) throws Exception {
+        for(DistributedDataStore datastore: datastores) {
+            for(String shard: shards) {
+                verifyVotingStates(datastore, shard, expStates);
+            }
+        }
+    }
+
+    @SafeVarargs
+    private static void verifyVotingStates(DistributedDataStore datastore, String shardName,
+            SimpleEntry<String, Boolean>... expStates) throws Exception {
+        String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
+        Map<String, Boolean> expStateMap = new HashMap<>();
+        for(Entry<String, Boolean> e: expStates) {
+            expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
+                    datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
+        }
+
+        verifyRaftState(datastore, shardName, raftState -> {
+            String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
+                    datastore.getActorContext().getDataStoreName()).toString();
+            assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
+            for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
+            }
+        });
     }
 
     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
diff --git a/opendaylight/md-sal/sal-cluster-admin/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-cluster-admin/src/test/resources/simplelogger.properties
new file mode 100644 (file)
index 0000000..3173698
--- /dev/null
@@ -0,0 +1,10 @@
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.Shard=error
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.utils.ActorContext=error
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.raft.RaftActorServerConfigurationSupport=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
\ No newline at end of file
index 0f49157..b01eb09 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Joiner;
 import com.google.common.base.Stopwatch;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -194,6 +195,15 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
     public String getVotedFor() {
         return getOnDemandRaftState().getVotedFor();
     }
+    @Override
+    public boolean isVoting() {
+        return getOnDemandRaftState().isVoting();
+    }
+
+    @Override
+    public String getPeerVotingStates() {
+        return toStringMap(getOnDemandRaftState().getPeerVotingStates());
+    }
 
     @Override
     public boolean isSnapshotCaptureInitiated() {
@@ -302,17 +312,11 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
 
     @Override
     public String getPeerAddresses() {
-        StringBuilder builder = new StringBuilder();
-        int i = 0;
-        for(Map.Entry<String, String> e: getOnDemandRaftState().getPeerAddresses().entrySet()) {
-            if(i++ > 0) {
-                builder.append(", ");
-            }
-
-            builder.append(e.getKey()).append(": ").append(e.getValue());
-        }
+        return toStringMap(getOnDemandRaftState().getPeerAddresses());
+    }
 
-        return builder.toString();
+    private static String toStringMap(Map<?, ?> map) {
+        return Joiner.on(", ").withKeyValueSeparator(": ").join(map);
     }
 
     @Override
@@ -347,6 +351,7 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         return shard.getPendingTxCommitQueueSize();
     }
 
+    @Override
     public int getTxCohortCacheSize() {
         return shard.getCohortCacheSize();
     }
@@ -357,5 +362,4 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
             shard.getSelf().tell(new InitiateCaptureSnapshot(), ActorRef.noSender());
         }
     }
-
 }
index 39cc22f..1ae58e2 100644 (file)
@@ -66,6 +66,8 @@ public interface ShardStatsMXBean {
 
    boolean isSnapshotCaptureInitiated();
 
+   boolean isVoting();
+
    void resetTransactionCounters();
 
    long getInMemoryJournalDataSize();
@@ -78,6 +80,8 @@ public interface ShardStatsMXBean {
 
    String getPeerAddresses();
 
+   String getPeerVotingStates();
+
    long getLeadershipChangeCount();
 
    String getLastLeadershipChangeTime();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java
new file mode 100644 (file)
index 0000000..0846f7b
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+
+/**
+ * A local message sent to the ShardManager to change the raft voting status for members of a shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class ChangeShardMembersVotingStatus {
+    private final String shardName;
+    private final Map<String, Boolean> meberVotingStatusMap;
+
+    public ChangeShardMembersVotingStatus(String shardName, Map<String, Boolean> meberVotingStatusMap) {
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.meberVotingStatusMap = ImmutableMap.copyOf(meberVotingStatusMap);
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+
+    public Map<String, Boolean> getMeberVotingStatusMap() {
+        return meberVotingStatusMap;
+    }
+
+    @Override
+    public String toString() {
+        return "ChangeShardMembersVotingStatus [shardName=" + shardName + ", meberVotingStatusMap="
+                + meberVotingStatusMap + "]";
+    }
+
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java
new file mode 100644 (file)
index 0000000..24859a4
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A local message sent to the ShardManager to flip the raft voting states for members of a shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class FlipShardMembersVotingStatus {
+    private final String shardName;
+
+    public FlipShardMembersVotingStatus(String shardName) {
+        this.shardName = Preconditions.checkNotNull(shardName);
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+
+    @Override
+    public String toString() {
+        return "FlipShardMembersVotingStatus [shardName=" + shardName + "]";
+    }
+}
index 04c64dd..0f6425e 100644 (file)
@@ -40,13 +40,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
@@ -63,10 +63,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -81,12 +83,16 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -220,9 +226,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
-        } else if(message instanceof PrimaryShardFoundForContext) {
-            PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
-            onPrimaryShardFoundContext(primaryShardFoundContext);
         } else if(message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
         } else if(message instanceof WrappedShardResponse){
@@ -231,6 +234,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
+        } else if(message instanceof ChangeShardMembersVotingStatus){
+            onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+        } else if(message instanceof FlipShardMembersVotingStatus){
+            onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
         } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
         } else if(message instanceof SaveSnapshotFailure) {
@@ -240,6 +247,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onShutDown();
         } else if (message instanceof GetLocalShardIds) {
             onGetLocalShardIds();
+        } else if(message instanceof RunnableMessage) {
+            ((RunnableMessage)message).run();
         } else {
             unknownMessage(message);
         }
@@ -312,16 +321,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
-        if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
-            addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
-                    getSender());
-        } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
-            removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
-                    primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
-        }
-    }
-
     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
             final ActorRef sender) {
         if(isShardReplicaOperationInProgress(shardName, sender)) {
@@ -347,6 +346,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
+                    shardReplicaOperationsInProgress.add(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                             primaryPath, shardName);
 
@@ -1079,7 +1079,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                getSelf().tell(new RunnableMessage() {
+                    @Override
+                    public void run() {
+                        addShard(getShardName(), response, getSender());
+                    }
+                }, getTargetActor());
             }
 
             @Override
@@ -1227,12 +1232,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell(new RunnableMessage() {
+                    @Override
+                    public void run() {
+                        removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender());
+                    }
+                }, getTargetActor());
             }
         });
     }
@@ -1284,6 +1298,145 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             0, 0));
     }
 
+    private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+        LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
+
+        String shardName = changeMembersVotingStatus.getShardName();
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+            serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+                    e.getValue());
+        }
+
+        ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
+
+        findLocalShard(shardName, getSender(),
+                localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+                        localShardFound.getPath(), getSender()));
+    }
+
+    private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+        LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
+
+        ActorRef sender = getSender();
+        final String shardName = flipMembersVotingStatus.getShardName();
+        findLocalShard(shardName, sender, localShardFound -> {
+            Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+                    Timeout.apply(30, TimeUnit.SECONDS));
+
+            future.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable failure, Object response) {
+                    if (failure != null) {
+                        sender.tell(new Status.Failure(new RuntimeException(
+                                String.format("Failed to access local shard %s", shardName), failure)), self());
+                        return;
+                    }
+
+                    OnDemandRaftState raftState = (OnDemandRaftState) response;
+                    Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+                    for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                        serverVotingStatusMap.put(e.getKey(), !e.getValue());
+                    }
+
+                    serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
+                            toString(), !raftState.isVoting());
+
+                    changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+                            shardName, localShardFound.getPath(), sender);
+                }
+            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        });
+
+    }
+
+    private void findLocalShard(final String shardName, final ActorRef sender,
+            final Consumer<LocalShardFound> onLocalShardFound) {
+        Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
+
+        Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(
+                            String.format("Failed to find local shard %s", shardName), failure)), self());
+                } else {
+                    if(response instanceof LocalShardFound) {
+                        getSelf().tell(new RunnableMessage() {
+                            @Override
+                            public void run() {
+                                onLocalShardFound.accept((LocalShardFound) response);
+                            }
+                        }, sender);
+                    } else if(response instanceof LocalShardNotFound) {
+                        String msg = String.format("Local shard %s does not exist", shardName);
+                        LOG.debug ("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+                    } else {
+                        String msg = String.format("Failed to find local shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug ("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+                                new RuntimeException(msg)), self());
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+            final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+        LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+                changeServersVotingStatus, shardActorRef.path());
+
+        Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+        Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                shardReplicaOperationsInProgress.remove(shardName);
+                if (failure != null) {
+                    String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+                            shardActorRef.path());
+                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+                    ServerChangeReply replyMsg = (ServerChangeReply) response;
+                    if(replyMsg.getStatus() == ServerChangeStatus.OK) {
+                        LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+                        sender.tell(new Status.Success(null), getSelf());
+                    } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+                        sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+                                "The requested voting state change for shard %s is invalid. At least one member must be voting",
+                                shardId.getShardName()))), getSelf());
+                    } else {
+                        LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+                                persistenceId(), shardName, replyMsg.getStatus());
+
+                        Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+                                replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+                        sender.tell(new Status.Failure(error), getSelf());
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     private static final class ForwardedAddServerReply {
         ShardInformation shardInfo;
         AddServerReply addServerReply;
@@ -1365,6 +1518,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    private static interface RunnableMessage extends Runnable {
+    }
+
     /**
      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
      * a remote or local find primary message is processed
@@ -1446,52 +1602,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    /**
-     * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
-     * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
-     * as a successful response to find primary.
-     */
-    private static class PrimaryShardFoundForContext {
-        private final String shardName;
-        private final Object contextMessage;
-        private final RemotePrimaryShardFound remotePrimaryShardFound;
-        private final LocalPrimaryShardFound localPrimaryShardFound;
-
-        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
-                @Nonnull Object primaryFoundMessage) {
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.contextMessage = Preconditions.checkNotNull(contextMessage);
-            Preconditions.checkNotNull(primaryFoundMessage);
-            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
-                    (RemotePrimaryShardFound) primaryFoundMessage : null;
-            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
-                    (LocalPrimaryShardFound) primaryFoundMessage : null;
-        }
-
-        @Nonnull
-        String getPrimaryPath(){
-            if(remotePrimaryShardFound != null) {
-                return remotePrimaryShardFound.getPrimaryPath();
-            }
-            return localPrimaryShardFound.getPrimaryPath();
-        }
-
-        @Nonnull
-        Object getContextMessage() {
-            return contextMessage;
-        }
-
-        @Nullable
-        RemotePrimaryShardFound getRemotePrimaryShardFound() {
-            return remotePrimaryShardFound;
-        }
-
-        @Nonnull
-        String getShardName() {
-            return shardName;
-        }
-    }
-
     /**
      * The WrappedShardResponse class wraps a response from a Shard.
      */
index f0f051f..3b81412 100644 (file)
@@ -86,6 +86,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
@@ -116,8 +117,10 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -1442,9 +1445,11 @@ public class ShardManagerTest extends AbstractActorTest {
         final ActorSystem system2 = newActorSystem("Member2");
         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
 
+        String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
         final TestActorRef<MockRespondActor> mockShardLeaderActor =
-                TestActorRef.create(system2, Props.create(MockRespondActor.class).
+                TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class,
+                        new AddServerReply(ServerChangeStatus.OK, memberId2)).
                         withDispatcher(Dispatchers.DefaultDispatcherId()), name);
         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
@@ -1458,7 +1463,6 @@ public class ShardManagerTest extends AbstractActorTest {
 
             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
 
-            String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
                     mock(DataTree.class), leaderVersion), mockShardLeaderActor);
@@ -1478,8 +1482,6 @@ public class ShardManagerTest extends AbstractActorTest {
             InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
 
             //construct a mock response message
-            AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
-            mockShardLeaderActor.underlyingActor().updateResponse(response);
             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
                 AddServer.class);
@@ -1512,7 +1514,7 @@ public class ShardManagerTest extends AbstractActorTest {
             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
-                    Props.create(MockRespondActor.class, addServerReply), leaderId);
+                    Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
 
             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
 
@@ -1675,7 +1677,8 @@ public class ShardManagerTest extends AbstractActorTest {
             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
 
             final TestActorRef<MockRespondActor> respondActor =
-                    TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
+                    actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
+                            new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
 
             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
 
@@ -1686,7 +1689,6 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
                     RaftState.Leader.name())), respondActor);
 
-            respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
             shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
             final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
             assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
@@ -1719,8 +1721,10 @@ public class ShardManagerTest extends AbstractActorTest {
         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
 
         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
+        String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
         final TestActorRef<MockRespondActor> mockShardLeaderActor =
-                TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+                TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
+                        new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
 
         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
 
@@ -1754,7 +1758,6 @@ public class ShardManagerTest extends AbstractActorTest {
             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
             newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
 
-            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
                     mock(DataTree.class), leaderVersion), mockShardLeaderActor);
@@ -1771,8 +1774,6 @@ public class ShardManagerTest extends AbstractActorTest {
             leaderShardManager.underlyingActor().waitForMemberUp();
 
             //construct a mock response message
-            RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
-            mockShardLeaderActor.underlyingActor().updateResponse(response);
             newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
             RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
                     RemoveServer.class);
@@ -1977,6 +1978,62 @@ public class ShardManagerTest extends AbstractActorTest {
         LOG.info("testShutDown ending");
     }
 
+    @Test
+    public void testChangeServersVotingStatus() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+            TestActorRef<MockRespondActor> respondActor =
+                    actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+                            new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
+
+            ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), respondActor);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
+                    DataStoreVersions.CURRENT_VERSION), getRef());
+            shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+                    RaftState.Leader.name())), respondActor);
+
+            shardManager.tell(new ChangeShardMembersVotingStatus("default",
+                    ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+            ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching(
+                    respondActor, ChangeServersVotingStatus.class);
+            assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
+                    ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"),
+                            shardMrgIDSuffix).toString(), Boolean.TRUE));
+
+            expectMsgClass(duration("5 seconds"), Success.class);
+        }};
+    }
+
+    @Test
+    public void testChangeServersVotingStatusWithNoLeader() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+            TestActorRef<MockRespondActor> respondActor =
+                    actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+                            new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
+
+            ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), respondActor);
+            shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor);
+
+            shardManager.tell(new ChangeShardMembersVotingStatus("default",
+                    ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+            MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
+
+            Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
+            assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException));
+        }};
+    }
+
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
@@ -2184,35 +2241,25 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static class MockRespondActor extends MessageCollectorActor {
         static final String CLEAR_RESPONSE = "clear-response";
-        static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
 
-        private volatile Object responseMsg;
+        private Object responseMsg;
+        private final Class<?> requestClass;
 
         @SuppressWarnings("unused")
-        public MockRespondActor() {
-        }
-
-        @SuppressWarnings("unused")
-        public MockRespondActor(Object responseMsg) {
+        public MockRespondActor(Class<?> requestClass, Object responseMsg) {
+            this.requestClass = requestClass;
             this.responseMsg = responseMsg;
         }
 
-        public void updateResponse(Object response) {
-            responseMsg = response;
-        }
-
         @Override
         public void onReceive(Object message) throws Exception {
-            if(!"get-all-messages".equals(message)) {
-                LOG.debug("Received message : {}", message);
-            }
-            super.onReceive(message);
-            if (message instanceof AddServer && responseMsg != null) {
-                getSender().tell(responseMsg, getSelf());
-            } else if(message instanceof RemoveServer && responseMsg != null){
-                getSender().tell(responseMsg, getSelf());
-            } else if(message.equals(CLEAR_RESPONSE)) {
+            if(message.equals(CLEAR_RESPONSE)) {
                 responseMsg = null;
+            } else {
+                super.onReceive(message);
+                if (message.getClass().equals(requestClass) && responseMsg != null) {
+                    getSender().tell(responseMsg, getSelf());
+                }
             }
         }
     }