Fix incorrect remove call in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 197ae82..8386e66 100644 (file)
@@ -14,10 +14,12 @@ import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.PoisonPill;
-import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
+import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
@@ -27,15 +29,9 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
-import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
 import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
@@ -43,16 +39,17 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -65,19 +62,17 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
@@ -87,19 +82,21 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +114,7 @@ import scala.concurrent.duration.FiniteDuration;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
+class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
@@ -156,19 +153,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String persistenceId;
 
-    /**
-     */
-    protected ShardManager(AbstractBuilder<?> builder) {
-
-        this.cluster = builder.cluster;
-        this.configuration = builder.configuration;
-        this.datastoreContextFactory = builder.datastoreContextFactory;
-        this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+    ShardManager(AbstractShardManagerCreator<?> builder) {
+        this.cluster = builder.getCluster();
+        this.configuration = builder.getConfiguration();
+        this.datastoreContextFactory = builder.getDatastoreContextFactory();
+        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
-        this.primaryShardInfoCache = builder.primaryShardInfoCache;
-        this.restoreFromSnapshot = builder.restoreFromSnapshot;
+        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+        this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
+        this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
@@ -178,12 +172,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        List<String> localShardActorNames = new ArrayList<>();
-        mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
-                "shard-manager-" + this.type,
-                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
-                localShardActorNames);
-        mBean.setShardManager(this);
+        mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+        mBean.registerMBean();
     }
 
     @Override
@@ -205,6 +196,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberWeaklyUp){
+            memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
         } else if (message instanceof ClusterEvent.MemberExited){
             memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
@@ -236,9 +229,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
-        } else if(message instanceof PrimaryShardFoundForContext) {
-            PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
-            onPrimaryShardFoundContext(primaryShardFoundContext);
         } else if(message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
         } else if(message instanceof WrappedShardResponse){
@@ -247,6 +237,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
+        } else if(message instanceof ChangeShardMembersVotingStatus){
+            onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+        } else if(message instanceof FlipShardMembersVotingStatus){
+            onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
         } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
         } else if(message instanceof SaveSnapshotFailure) {
@@ -254,6 +248,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
         } else if(message instanceof Shutdown) {
             onShutDown();
+        } else if (message instanceof GetLocalShardIds) {
+            onGetLocalShardIds();
+        } else if(message instanceof RunnableMessage) {
+            ((RunnableMessage)message).run();
         } else {
             unknownMessage(message);
         }
@@ -309,31 +307,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
             String leaderPath) {
-        shardReplicaOperationsInProgress.remove(shardId);
+        shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
                     shardId.getShardName());
-            originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+            originalSender.tell(new Status.Success(null), getSelf());
         } else {
             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
                     persistenceId(), shardId, replyMsg.getStatus());
 
-            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
-                    leaderPath, shardId);
-            originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
-        }
-    }
-
-    private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
-        if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
-            addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
-                    getSender());
-        } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
-            removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
-                    primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
+            originalSender.tell(new Status.Failure(failure), getSelf());
         }
     }
 
@@ -362,6 +349,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
+                    shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                             primaryPath, shardName);
 
@@ -406,7 +394,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         if(notInitialized != null) {
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+            getSender().tell(new Status.Failure(new IllegalStateException(String.format(
                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
             return;
         }
@@ -433,14 +421,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             String shardName = createShard.getModuleShardConfig().getShardName();
             if(localShards.containsKey(shardName)) {
                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
-                reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+                reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
             } else {
                 doCreateShard(createShard);
-                reply = new akka.actor.Status.Success(null);
+                reply = new Status.Success(null);
             }
         } catch (Exception e) {
             LOG.error("{}: onCreateShard failed", persistenceId(), e);
-            reply = new akka.actor.Status.Failure(e);
+            reply = new Status.Failure(e);
         }
 
         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
@@ -493,8 +481,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         info.setActiveMember(isActiveMember);
         localShards.put(info.getShardName(), info);
 
-        mBean.addLocalShard(shardId.toString());
-
         if(schemaContext != null) {
             info.setActor(newShardActor(schemaContext, info));
         }
@@ -545,10 +531,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         if(!shardInfo.isShardInitialized()) {
             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
-            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+            message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
         } else {
             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
-            message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+            message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
         }
     }
 
@@ -618,9 +604,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         String actorName = sender.path().name();
         //find shard name from actor name; actor name is stringified shardId
-        ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
 
-        if (shardId.getShardName() == null) {
+        final ShardIdentifier shardId;
+        try {
+            shardId = ShardIdentifier.fromShardIdString(actorName);
+        } catch (IllegalArgumentException e) {
+            LOG.debug("{}: ignoring actor {}", actorName, e);
             return;
         }
 
@@ -679,12 +668,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
-            @Override
-            public Object get() {
-                return new LocalShardFound(shardInformation.getActor());
-            }
-        });
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
     }
 
     private void sendResponse(ShardInformation shardInformation, boolean doWait,
@@ -694,12 +678,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 final ActorRef sender = getSender();
                 final ActorRef self = self();
 
-                Runnable replyRunnable = new Runnable() {
-                    @Override
-                    public void run() {
-                        sender.tell(messageSupplier.get(), self);
-                    }
-                };
+                Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
 
                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
                     new OnShardInitialized(replyRunnable);
@@ -727,11 +706,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             } else if (!shardInformation.isShardInitialized()) {
                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
                         shardInformation.getShardName());
-                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
+                getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
             } else {
                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
                         shardInformation.getShardName());
-                getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
+                getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
             }
 
             return;
@@ -749,10 +728,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
 
+    @VisibleForTesting
+    static MemberName memberToName(final Member member) {
+        return MemberName.forName(member.roles().iterator().next());
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
@@ -763,9 +747,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberExited(ClusterEvent.MemberExited message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
@@ -776,17 +760,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        addPeerAddress(memberName, message.member().address());
+        memberUp(memberName, message.member().address());
+    }
 
+    private void memberUp(MemberName memberName, Address address) {
+        addPeerAddress(memberName, address);
         checkReady();
     }
 
-    private void addPeerAddress(String memberName, Address address) {
+    private void memberWeaklyUp(MemberWeaklyUp message) {
+        MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberUp(memberName, message.member().address());
+    }
+
+    private void addPeerAddress(MemberName memberName, Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
         for(ShardInformation info : localShards.values()){
@@ -799,8 +795,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
-        String memberName = message.member().roles().iterator().next();
-        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+        MemberName memberName = memberToName(message.member());
+        LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
 
@@ -808,16 +804,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
-        String memberName = message.member().roles().iterator().next();
-        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+        MemberName memberName = memberToName(message.member());
+        LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
     }
 
-    private void markMemberUnavailable(final String memberName) {
-        for(ShardInformation info : localShards.values()){
+    private void markMemberUnavailable(final MemberName memberName) {
+        final String memberStr = memberName.getName();
+        for (ShardInformation info : localShards.values()) {
             String leaderId = info.getLeaderId();
-            if(leaderId != null && leaderId.contains(memberName)) {
+            // XXX: why are we using String#contains() here?
+            if (leaderId != null && leaderId.contains(memberStr)) {
                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
                 info.setLeaderAvailable(false);
 
@@ -828,10 +826,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void markMemberAvailable(final String memberName) {
-        for(ShardInformation info : localShards.values()){
+    private void markMemberAvailable(final MemberName memberName) {
+        final String memberStr = memberName.getName();
+        for (ShardInformation info : localShards.values()) {
             String leaderId = info.getLeaderId();
-            if(leaderId != null && leaderId.contains(memberName)) {
+            // XXX: why are we using String#contains() here?
+            if (leaderId != null && leaderId.contains(memberStr)) {
                 LOG.debug("Marking Leader {} as available.", leaderId);
                 info.setLeaderAvailable(true);
             }
@@ -847,17 +847,44 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onSwitchShardBehavior(SwitchShardBehavior message) {
-        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+    private void onGetLocalShardIds() {
+        final List<String> response = new ArrayList<>(localShards.size());
+
+        for (ShardInformation info : localShards.values()) {
+            response.add(info.getShardId().toString());
+        }
+
+        getSender().tell(new Status.Success(response), getSelf());
+    }
+
+    private void onSwitchShardBehavior(final SwitchShardBehavior message) {
+        final ShardIdentifier identifier = message.getShardId();
 
-        ShardInformation shardInformation = localShards.get(identifier.getShardName());
+        if (identifier != null) {
+            final ShardInformation info = localShards.get(identifier.getShardName());
+            if (info == null) {
+                getSender().tell(new Status.Failure(
+                    new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
+                return;
+            }
 
-        if(shardInformation != null && shardInformation.getActor() != null) {
-            shardInformation.getActor().tell(
-                    new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
+            switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
         } else {
+            for (ShardInformation info : localShards.values()) {
+                switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
+            }
+        }
+
+        getSender().tell(new Status.Success(null), getSelf());
+    }
+
+    private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
+        final ActorRef actor = info.getActor();
+        if (actor != null) {
+            actor.tell(switchBehavior, getSelf());
+          } else {
             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
-                    message.getShardName(), message.getNewState());
+                info.getShardName(), switchBehavior.getNewState());
         }
     }
 
@@ -901,20 +928,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null && info.isActiveMember()) {
-            sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
-                @Override
-                public Object get() {
-                    String primaryPath = info.getSerializedLeaderActor();
-                    Object found = canReturnLocalShardState && info.isLeader() ?
-                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
-                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
-
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
-                            }
+            sendResponse(info, message.isWaitUntilReady(), true, () -> {
+                String primaryPath = info.getSerializedLeaderActor();
+                Object found = canReturnLocalShardState && info.isLeader() ?
+                        new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                            new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                        }
 
-                            return found;
-                }
+                        return found;
             });
 
             return;
@@ -956,7 +980,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName
      * @return
      */
-    private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+    private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
         return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
@@ -966,7 +990,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      *
      */
     private void createLocalShards() {
-        String memberName = this.cluster.getCurrentMemberName();
+        MemberName memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
@@ -988,7 +1012,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
                         shardSnapshots.get(shardName)), peerAddressResolver));
-            mBean.addLocalShard(shardId.toString());
         }
     }
 
@@ -998,13 +1021,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName
      */
     private Map<String, String> getPeerAddresses(String shardName) {
-        Collection<String> members = configuration.getMembersFromShardName(shardName);
+        Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
         Map<String, String> peerAddresses = new HashMap<>();
 
-        String currentMemberName = this.cluster.getCurrentMemberName();
+        MemberName currentMemberName = this.cluster.getCurrentMemberName();
 
-        for(String memberName : members) {
-            if(!currentMemberName.equals(memberName)) {
+        for (MemberName memberName : members) {
+            if (!currentMemberName.equals(memberName)) {
                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
                 peerAddresses.put(shardId.toString(), address);
@@ -1017,13 +1040,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public SupervisorStrategy supervisorStrategy() {
 
         return new OneForOneStrategy(10, Duration.create("1 minute"),
-                new Function<Throwable, SupervisorStrategy.Directive>() {
-            @Override
-            public SupervisorStrategy.Directive apply(Throwable t) {
-                LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
-                return SupervisorStrategy.resume();
-            }
-        }
+                (Function<Throwable, Directive>) t -> {
+                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+                    return SupervisorStrategy.resume();
+                }
                 );
 
     }
@@ -1042,7 +1062,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (shardReplicaOperationsInProgress.contains(shardName)) {
             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
             LOG.debug ("{}: {}", persistenceId(), msg);
-            sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
             return true;
         }
 
@@ -1058,7 +1078,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (!(this.configuration.isShardConfigured(shardName))) {
             String msg = String.format("No module configuration exists for shard %s", shardName);
             LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
             return;
         }
 
@@ -1067,14 +1087,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             String msg = String.format(
                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
             LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
             return;
         }
 
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor());
             }
 
             @Override
@@ -1088,7 +1108,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
         String msg = String.format("Local shard %s already exists", shardName);
         LOG.debug ("{}: {}", persistenceId(), msg);
-        sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+        sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
     }
 
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
@@ -1158,7 +1178,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
 
-        sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+        sender.tell(new Status.Failure(message == null ? failure :
             new RuntimeException(message, failure)), getSelf());
     }
 
@@ -1177,8 +1197,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInfo.setActiveMember(true);
             persistShardList();
 
-            mBean.addLocalShard(shardInfo.getShardId().toString());
-            sender.tell(new akka.actor.Status.Success(null), getSelf());
+            sender.tell(new Status.Success(null), getSelf());
         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
             sendLocalReplicaAlreadyExistsReply(shardName, sender);
         } else {
@@ -1223,12 +1242,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor());
             }
         });
     }
@@ -1254,7 +1277,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
 
-        String currentMember = cluster.getCurrentMemberName();
+        final MemberName currentMember = cluster.getCurrentMemberName();
         Set<String> configuredShardList =
             new HashSet<>(configuration.getMemberShardNames(currentMember));
         for (String shard : currentSnapshot.getShardList()) {
@@ -1280,268 +1303,171 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             0, 0));
     }
 
-    private static class ForwardedAddServerReply {
-        ShardInformation shardInfo;
-        AddServerReply addServerReply;
-        String leaderPath;
-        boolean removeShardOnFailure;
+    private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+        LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
 
-        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
-                boolean removeShardOnFailure) {
-            this.shardInfo = shardInfo;
-            this.addServerReply = addServerReply;
-            this.leaderPath = leaderPath;
-            this.removeShardOnFailure = removeShardOnFailure;
+        String shardName = changeMembersVotingStatus.getShardName();
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+            serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+                    e.getValue());
         }
-    }
 
-    private static class ForwardedAddServerFailure {
-        String shardName;
-        String failureMessage;
-        Throwable failure;
-        boolean removeShardOnFailure;
+        ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
 
-        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
-                boolean removeShardOnFailure) {
-            this.shardName = shardName;
-            this.failureMessage = failureMessage;
-            this.failure = failure;
-            this.removeShardOnFailure = removeShardOnFailure;
-        }
+        findLocalShard(shardName, getSender(),
+                localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+                        localShardFound.getPath(), getSender()));
     }
 
-    @VisibleForTesting
-    protected static class ShardInformation {
-        private final ShardIdentifier shardId;
-        private final String shardName;
-        private ActorRef actor;
-        private final Map<String, String> initialPeerAddresses;
-        private Optional<DataTree> localShardDataTree;
-        private boolean leaderAvailable = false;
-
-        // flag that determines if the actor is ready for business
-        private boolean actorInitialized = false;
-
-        private boolean followerSyncStatus = false;
-
-        private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
-        private String role ;
-        private String leaderId;
-        private short leaderVersion;
-
-        private DatastoreContext datastoreContext;
-        private Shard.AbstractBuilder<?, ?> builder;
-        private final ShardPeerAddressResolver addressResolver;
-        private boolean isActiveMember = true;
-
-        private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
-                Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
-            this.shardName = shardName;
-            this.shardId = shardId;
-            this.initialPeerAddresses = initialPeerAddresses;
-            this.datastoreContext = datastoreContext;
-            this.builder = builder;
-            this.addressResolver = addressResolver;
-        }
-
-        Props newProps(SchemaContext schemaContext) {
-            Preconditions.checkNotNull(builder);
-            Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
-                    schemaContext(schemaContext).props();
-            builder = null;
-            return props;
-        }
-
-        String getShardName() {
-            return shardName;
-        }
-
-        @Nullable
-        ActorRef getActor(){
-            return actor;
-        }
-
-        void setActor(ActorRef actor) {
-            this.actor = actor;
-        }
+    private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+        LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
 
-        ShardIdentifier getShardId() {
-            return shardId;
-        }
-
-        void setLocalDataTree(Optional<DataTree> localShardDataTree) {
-            this.localShardDataTree = localShardDataTree;
-        }
+        ActorRef sender = getSender();
+        final String shardName = flipMembersVotingStatus.getShardName();
+        findLocalShard(shardName, sender, localShardFound -> {
+            Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+                    Timeout.apply(30, TimeUnit.SECONDS));
 
-        Optional<DataTree> getLocalShardDataTree() {
-            return localShardDataTree;
-        }
-
-        DatastoreContext getDatastoreContext() {
-            return datastoreContext;
-        }
+            future.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable failure, Object response) {
+                    if (failure != null) {
+                        sender.tell(new Status.Failure(new RuntimeException(
+                                String.format("Failed to access local shard %s", shardName), failure)), self());
+                        return;
+                    }
 
-        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
-            this.datastoreContext = datastoreContext;
-            if (actor != null) {
-                LOG.debug ("Sending new DatastoreContext to {}", shardId);
-                actor.tell(this.datastoreContext, sender);
-            }
-        }
+                    OnDemandRaftState raftState = (OnDemandRaftState) response;
+                    Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+                    for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                        serverVotingStatusMap.put(e.getKey(), !e.getValue());
+                    }
 
-        void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+                    serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
+                            toString(), !raftState.isVoting());
 
-            if(actor != null) {
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
-                            peerId, peerAddress, actor.path());
+                    changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+                            shardName, localShardFound.getPath(), sender);
                 }
+            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        });
 
-                actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
-            }
-
-            notifyOnShardInitializedCallbacks();
-        }
-
-        void peerDown(String memberName, String peerId, ActorRef sender) {
-            if(actor != null) {
-                actor.tell(new PeerDown(memberName, peerId), sender);
-            }
-        }
-
-        void peerUp(String memberName, String peerId, ActorRef sender) {
-            if(actor != null) {
-                actor.tell(new PeerUp(memberName, peerId), sender);
-            }
-        }
-
-        boolean isShardReady() {
-            return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
-        }
-
-        boolean isShardReadyWithLeaderId() {
-            return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
-                    (isLeader() || addressResolver.resolve(leaderId) != null);
-        }
-
-        boolean isShardInitialized() {
-            return getActor() != null && actorInitialized;
-        }
-
-        boolean isLeader() {
-            return Objects.equal(leaderId, shardId.toString());
-        }
-
-        String getSerializedLeaderActor() {
-            if(isLeader()) {
-                return Serialization.serializedActorPath(getActor());
-            } else {
-                return addressResolver.resolve(leaderId);
-            }
-        }
-
-        void setActorInitialized() {
-            LOG.debug("Shard {} is initialized", shardId);
-
-            this.actorInitialized = true;
-
-            notifyOnShardInitializedCallbacks();
-        }
-
-        private void notifyOnShardInitializedCallbacks() {
-            if(onShardInitializedSet.isEmpty()) {
-                return;
-            }
-
-            boolean ready = isShardReadyWithLeaderId();
+    }
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
-                        ready ? "ready" : "initialized", onShardInitializedSet.size());
-            }
+    private void findLocalShard(final String shardName, final ActorRef sender,
+            final Consumer<LocalShardFound> onLocalShardFound) {
+        Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
 
-            Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
-            while(iter.hasNext()) {
-                OnShardInitialized onShardInitialized = iter.next();
-                if(!(onShardInitialized instanceof OnShardReady) || ready) {
-                    iter.remove();
-                    onShardInitialized.getTimeoutSchedule().cancel();
-                    onShardInitialized.getReplyRunnable().run();
+        Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(
+                            String.format("Failed to find local shard %s", shardName), failure)), self());
+                } else {
+                    if(response instanceof LocalShardFound) {
+                        getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender);
+                    } else if(response instanceof LocalShardNotFound) {
+                        String msg = String.format("Local shard %s does not exist", shardName);
+                        LOG.debug ("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+                    } else {
+                        String msg = String.format("Failed to find local shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug ("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+                                new RuntimeException(msg)), self());
+                    }
                 }
             }
-        }
-
-        void addOnShardInitialized(OnShardInitialized onShardInitialized) {
-            onShardInitializedSet.add(onShardInitialized);
-        }
-
-        void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
-            onShardInitializedSet.remove(onShardInitialized);
-        }
-
-        void setRole(String newRole) {
-            this.role = newRole;
-
-            notifyOnShardInitializedCallbacks();
-        }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
 
-        void setFollowerSyncStatus(boolean syncStatus){
-            this.followerSyncStatus = syncStatus;
+    private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+            final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
         }
 
-        boolean isInSync(){
-            if(RaftState.Follower.name().equals(this.role)){
-                return followerSyncStatus;
-            } else if(RaftState.Leader.name().equals(this.role)){
-                return true;
-            }
+        shardReplicaOperationsInProgress.add(shardName);
 
-            return false;
-        }
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-        boolean setLeaderId(String leaderId) {
-            boolean changed = !Objects.equal(this.leaderId, leaderId);
-            this.leaderId = leaderId;
-            if(leaderId != null) {
-                this.leaderAvailable = true;
-            }
-            notifyOnShardInitializedCallbacks();
+        LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+                changeServersVotingStatus, shardActorRef.path());
 
-            return changed;
-        }
-
-        String getLeaderId() {
-            return leaderId;
-        }
+        Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+        Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
 
-        void setLeaderAvailable(boolean leaderAvailable) {
-            this.leaderAvailable = leaderAvailable;
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                shardReplicaOperationsInProgress.remove(shardName);
+                if (failure != null) {
+                    String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+                            shardActorRef.path());
+                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+                    ServerChangeReply replyMsg = (ServerChangeReply) response;
+                    if(replyMsg.getStatus() == ServerChangeStatus.OK) {
+                        LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+                        sender.tell(new Status.Success(null), getSelf());
+                    } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+                        sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+                                "The requested voting state change for shard %s is invalid. At least one member must be voting",
+                                shardId.getShardName()))), getSelf());
+                    } else {
+                        LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+                                persistenceId(), shardName, replyMsg.getStatus());
 
-            if(leaderAvailable) {
-                notifyOnShardInitializedCallbacks();
+                        Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+                                replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+                        sender.tell(new Status.Failure(error), getSelf());
+                    }
+                }
             }
-        }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
 
-        short getLeaderVersion() {
-            return leaderVersion;
-        }
+    private static final class ForwardedAddServerReply {
+        ShardInformation shardInfo;
+        AddServerReply addServerReply;
+        String leaderPath;
+        boolean removeShardOnFailure;
 
-        void setLeaderVersion(short leaderVersion) {
-            this.leaderVersion = leaderVersion;
+        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+                boolean removeShardOnFailure) {
+            this.shardInfo = shardInfo;
+            this.addServerReply = addServerReply;
+            this.leaderPath = leaderPath;
+            this.removeShardOnFailure = removeShardOnFailure;
         }
+    }
 
-        boolean isActiveMember() {
-            return isActiveMember;
-        }
+    private static final class ForwardedAddServerFailure {
+        String shardName;
+        String failureMessage;
+        Throwable failure;
+        boolean removeShardOnFailure;
 
-        void setActiveMember(boolean isActiveMember) {
-            this.isActiveMember = isActiveMember;
+        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+                boolean removeShardOnFailure) {
+            this.shardName = shardName;
+            this.failureMessage = failureMessage;
+            this.failure = failure;
+            this.removeShardOnFailure = removeShardOnFailure;
         }
     }
 
-    private static class OnShardInitialized {
+    static class OnShardInitialized {
         private final Runnable replyRunnable;
         private Cancellable timeoutSchedule;
 
@@ -1562,112 +1488,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private static class OnShardReady extends OnShardInitialized {
+    static class OnShardReady extends OnShardInitialized {
         OnShardReady(Runnable replyRunnable) {
             super(replyRunnable);
         }
     }
 
-    private static class ShardNotInitializedTimeout {
-        private final ActorRef sender;
-        private final ShardInformation shardInfo;
-        private final OnShardInitialized onShardInitialized;
-
-        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
-            this.sender = sender;
-            this.shardInfo = shardInfo;
-            this.onShardInitialized = onShardInitialized;
-        }
-
-        ActorRef getSender() {
-            return sender;
-        }
-
-        ShardInformation getShardInfo() {
-            return shardInfo;
-        }
-
-        OnShardInitialized getOnShardInitialized() {
-            return onShardInitialized;
-        }
-    }
-
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
-        private ClusterWrapper cluster;
-        private Configuration configuration;
-        private DatastoreContextFactory datastoreContextFactory;
-        private CountDownLatch waitTillReadyCountdownLatch;
-        private PrimaryShardInfoFutureCache primaryShardInfoCache;
-        private DatastoreSnapshot restoreFromSnapshot;
-        private volatile boolean sealed;
-
-        @SuppressWarnings("unchecked")
-        private T self() {
-            return (T) this;
-        }
-
-        protected void checkSealed() {
-            Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
-        }
-
-        public T cluster(ClusterWrapper cluster) {
-            checkSealed();
-            this.cluster = cluster;
-            return self();
-        }
-
-        public T configuration(Configuration configuration) {
-            checkSealed();
-            this.configuration = configuration;
-            return self();
-        }
-
-        public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
-            checkSealed();
-            this.datastoreContextFactory = datastoreContextFactory;
-            return self();
-        }
-
-        public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
-            checkSealed();
-            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-            return self();
-        }
-
-        public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
-            checkSealed();
-            this.primaryShardInfoCache = primaryShardInfoCache;
-            return self();
-        }
-
-        public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
-            checkSealed();
-            this.restoreFromSnapshot = restoreFromSnapshot;
-            return self();
-        }
-
-        protected void verify() {
-            sealed = true;
-            Preconditions.checkNotNull(cluster, "cluster should not be null");
-            Preconditions.checkNotNull(configuration, "configuration should not be null");
-            Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
-            Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
-            Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
-        }
-
-        public Props props() {
-            verify();
-            return Props.create(ShardManager.class, this);
-        }
-    }
-
-    public static class Builder extends AbstractBuilder<Builder> {
-    }
-
     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
                 getShardInitializationTimeout().duration().$times(2));
@@ -1692,6 +1518,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    private static interface RunnableMessage extends Runnable {
+    }
+
     /**
      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
      * a remote or local find primary message is processed
@@ -1759,7 +1588,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         @Override
         public void onFailure(Throwable failure) {
             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
-            targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+            targetActor.tell(new Status.Failure(new RuntimeException(
                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
         }
 
@@ -1768,67 +1597,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             String msg = String.format("Failed to find leader for shard %s: received response: %s",
                     shardName, response);
             LOG.debug ("{}: {}", persistenceId, msg);
-            targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+            targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
                     new RuntimeException(msg)), shardManagerActor);
         }
     }
 
-
-    /**
-     * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
-     * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
-     * as a successful response to find primary.
-     */
-    private static class PrimaryShardFoundForContext {
-        private final String shardName;
-        private final Object contextMessage;
-        private final RemotePrimaryShardFound remotePrimaryShardFound;
-        private final LocalPrimaryShardFound localPrimaryShardFound;
-
-        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
-                @Nonnull Object primaryFoundMessage) {
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.contextMessage = Preconditions.checkNotNull(contextMessage);
-            Preconditions.checkNotNull(primaryFoundMessage);
-            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
-                    (RemotePrimaryShardFound) primaryFoundMessage : null;
-            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
-                    (LocalPrimaryShardFound) primaryFoundMessage : null;
-        }
-
-        @Nonnull
-        String getPrimaryPath(){
-            if(remotePrimaryShardFound != null) {
-                return remotePrimaryShardFound.getPrimaryPath();
-            }
-            return localPrimaryShardFound.getPrimaryPath();
-        }
-
-        @Nonnull
-        Object getContextMessage() {
-            return contextMessage;
-        }
-
-        @Nullable
-        RemotePrimaryShardFound getRemotePrimaryShardFound() {
-            return remotePrimaryShardFound;
-        }
-
-        @Nonnull
-        String getShardName() {
-            return shardName;
-        }
-    }
-
     /**
      * The WrappedShardResponse class wraps a response from a Shard.
      */
-    private static class WrappedShardResponse {
+    private static final class WrappedShardResponse {
         private final ShardIdentifier shardId;
         private final Object response;
         private final String leaderPath;
 
-        private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+        WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
             this.shardId = shardId;
             this.response = response;
             this.leaderPath = leaderPath;
@@ -1846,6 +1628,30 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return leaderPath;
         }
     }
+
+    private static final class ShardNotInitializedTimeout {
+        private final ActorRef sender;
+        private final ShardInformation shardInfo;
+        private final OnShardInitialized onShardInitialized;
+
+        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+            this.sender = sender;
+            this.shardInfo = shardInfo;
+            this.onShardInitialized = onShardInitialized;
+        }
+
+        ActorRef getSender() {
+            return sender;
+        }
+
+        ShardInformation getShardInfo() {
+            return shardInfo;
+        }
+
+        OnShardInitialized getOnShardInitialized() {
+            return onShardInitialized;
+        }
+    }
 }