Fix incorrect remove call in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index aa2c524fe7cf5acd3c0b49fc8f210df013ff6e44..8386e669b83f46cb102771f7c12f1b9458f5e485 100644 (file)
@@ -18,6 +18,7 @@ import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
 import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
 import akka.actor.SupervisorStrategy;
 import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
 import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
@@ -40,13 +41,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
@@ -63,10 +64,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -81,12 +84,16 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -189,6 +196,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberWeaklyUp){
+            memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
         } else if (message instanceof ClusterEvent.MemberExited){
             memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
         } else if (message instanceof ClusterEvent.MemberExited){
             memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
@@ -220,9 +229,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
         } else if(message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
-        } else if(message instanceof PrimaryShardFoundForContext) {
-            PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
-            onPrimaryShardFoundContext(primaryShardFoundContext);
         } else if(message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
         } else if(message instanceof WrappedShardResponse){
         } else if(message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
         } else if(message instanceof WrappedShardResponse){
@@ -231,6 +237,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
+        } else if(message instanceof ChangeShardMembersVotingStatus){
+            onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+        } else if(message instanceof FlipShardMembersVotingStatus){
+            onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
         } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
         } else if(message instanceof SaveSnapshotFailure) {
         } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
         } else if(message instanceof SaveSnapshotFailure) {
@@ -240,6 +250,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onShutDown();
         } else if (message instanceof GetLocalShardIds) {
             onGetLocalShardIds();
             onShutDown();
         } else if (message instanceof GetLocalShardIds) {
             onGetLocalShardIds();
+        } else if(message instanceof RunnableMessage) {
+            ((RunnableMessage)message).run();
         } else {
             unknownMessage(message);
         }
         } else {
             unknownMessage(message);
         }
@@ -295,7 +307,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
             String leaderPath) {
 
     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
             String leaderPath) {
-        shardReplicaOperationsInProgress.remove(shardId);
+        shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
 
 
         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
 
@@ -312,16 +324,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
-        if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
-            addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
-                    getSender());
-        } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
-            removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
-                    primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
-        }
-    }
-
     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
             final ActorRef sender) {
         if(isShardReplicaOperationInProgress(shardName, sender)) {
     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
             final ActorRef sender) {
         if(isShardReplicaOperationInProgress(shardName, sender)) {
@@ -347,6 +349,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
+                    shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                             primaryPath, shardName);
 
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                             primaryPath, shardName);
 
@@ -601,9 +604,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         String actorName = sender.path().name();
         //find shard name from actor name; actor name is stringified shardId
 
         String actorName = sender.path().name();
         //find shard name from actor name; actor name is stringified shardId
-        ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
 
 
-        if (shardId.getShardName() == null) {
+        final ShardIdentifier shardId;
+        try {
+            shardId = ShardIdentifier.fromShardIdString(actorName);
+        } catch (IllegalArgumentException e) {
+            LOG.debug("{}: ignoring actor {}", actorName, e);
             return;
         }
 
             return;
         }
 
@@ -730,7 +736,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
         MemberName memberName = memberToName(message.member());
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
@@ -743,7 +749,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberExited(ClusterEvent.MemberExited message) {
         MemberName memberName = memberToName(message.member());
 
     private void memberExited(ClusterEvent.MemberExited message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
@@ -756,14 +762,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberUp(ClusterEvent.MemberUp message) {
         MemberName memberName = memberToName(message.member());
 
     private void memberUp(ClusterEvent.MemberUp message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
                 message.member().address());
 
-        addPeerAddress(memberName, message.member().address());
+        memberUp(memberName, message.member().address());
+    }
 
 
+    private void memberUp(MemberName memberName, Address address) {
+        addPeerAddress(memberName, address);
         checkReady();
     }
 
         checkReady();
     }
 
+    private void memberWeaklyUp(MemberWeaklyUp message) {
+        MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberUp(memberName, message.member().address());
+    }
+
     private void addPeerAddress(MemberName memberName, Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
     private void addPeerAddress(MemberName memberName, Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
@@ -778,7 +796,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
         MemberName memberName = memberToName(message.member());
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
         MemberName memberName = memberToName(message.member());
-        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+        LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
 
 
         addPeerAddress(memberName, message.member().address());
 
@@ -787,7 +805,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
         MemberName memberName = memberToName(message.member());
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
         MemberName memberName = memberToName(message.member());
-        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+        LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
     }
 
         markMemberUnavailable(memberName);
     }
@@ -1076,7 +1094,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor());
             }
 
             @Override
             }
 
             @Override
@@ -1224,12 +1242,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
             }
 
             @Override
             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor());
             }
         });
     }
             }
         });
     }
@@ -1281,6 +1303,140 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             0, 0));
     }
 
             0, 0));
     }
 
+    private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+        LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
+
+        String shardName = changeMembersVotingStatus.getShardName();
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+            serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+                    e.getValue());
+        }
+
+        ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
+
+        findLocalShard(shardName, getSender(),
+                localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+                        localShardFound.getPath(), getSender()));
+    }
+
+    private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+        LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
+
+        ActorRef sender = getSender();
+        final String shardName = flipMembersVotingStatus.getShardName();
+        findLocalShard(shardName, sender, localShardFound -> {
+            Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+                    Timeout.apply(30, TimeUnit.SECONDS));
+
+            future.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable failure, Object response) {
+                    if (failure != null) {
+                        sender.tell(new Status.Failure(new RuntimeException(
+                                String.format("Failed to access local shard %s", shardName), failure)), self());
+                        return;
+                    }
+
+                    OnDemandRaftState raftState = (OnDemandRaftState) response;
+                    Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+                    for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                        serverVotingStatusMap.put(e.getKey(), !e.getValue());
+                    }
+
+                    serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
+                            toString(), !raftState.isVoting());
+
+                    changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+                            shardName, localShardFound.getPath(), sender);
+                }
+            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        });
+
+    }
+
+    private void findLocalShard(final String shardName, final ActorRef sender,
+            final Consumer<LocalShardFound> onLocalShardFound) {
+        Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
+
+        Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(
+                            String.format("Failed to find local shard %s", shardName), failure)), self());
+                } else {
+                    if(response instanceof LocalShardFound) {
+                        getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender);
+                    } else if(response instanceof LocalShardNotFound) {
+                        String msg = String.format("Local shard %s does not exist", shardName);
+                        LOG.debug ("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+                    } else {
+                        String msg = String.format("Failed to find local shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug ("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+                                new RuntimeException(msg)), self());
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+            final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+        LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+                changeServersVotingStatus, shardActorRef.path());
+
+        Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+        Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                shardReplicaOperationsInProgress.remove(shardName);
+                if (failure != null) {
+                    String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+                            shardActorRef.path());
+                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+                    ServerChangeReply replyMsg = (ServerChangeReply) response;
+                    if(replyMsg.getStatus() == ServerChangeStatus.OK) {
+                        LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+                        sender.tell(new Status.Success(null), getSelf());
+                    } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+                        sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+                                "The requested voting state change for shard %s is invalid. At least one member must be voting",
+                                shardId.getShardName()))), getSelf());
+                    } else {
+                        LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+                                persistenceId(), shardName, replyMsg.getStatus());
+
+                        Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+                                replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+                        sender.tell(new Status.Failure(error), getSelf());
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     private static final class ForwardedAddServerReply {
         ShardInformation shardInfo;
         AddServerReply addServerReply;
     private static final class ForwardedAddServerReply {
         ShardInformation shardInfo;
         AddServerReply addServerReply;
@@ -1362,6 +1518,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    private static interface RunnableMessage extends Runnable {
+    }
+
     /**
      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
      * a remote or local find primary message is processed
     /**
      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
      * a remote or local find primary message is processed
@@ -1443,52 +1602,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    /**
-     * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
-     * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
-     * as a successful response to find primary.
-     */
-    private static class PrimaryShardFoundForContext {
-        private final String shardName;
-        private final Object contextMessage;
-        private final RemotePrimaryShardFound remotePrimaryShardFound;
-        private final LocalPrimaryShardFound localPrimaryShardFound;
-
-        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
-                @Nonnull Object primaryFoundMessage) {
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.contextMessage = Preconditions.checkNotNull(contextMessage);
-            Preconditions.checkNotNull(primaryFoundMessage);
-            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
-                    (RemotePrimaryShardFound) primaryFoundMessage : null;
-            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
-                    (LocalPrimaryShardFound) primaryFoundMessage : null;
-        }
-
-        @Nonnull
-        String getPrimaryPath(){
-            if(remotePrimaryShardFound != null) {
-                return remotePrimaryShardFound.getPrimaryPath();
-            }
-            return localPrimaryShardFound.getPrimaryPath();
-        }
-
-        @Nonnull
-        Object getContextMessage() {
-            return contextMessage;
-        }
-
-        @Nullable
-        RemotePrimaryShardFound getRemotePrimaryShardFound() {
-            return remotePrimaryShardFound;
-        }
-
-        @Nonnull
-        String getShardName() {
-            return shardName;
-        }
-    }
-
     /**
      * The WrappedShardResponse class wraps a response from a Shard.
      */
     /**
      * The WrappedShardResponse class wraps a response from a Shard.
      */