Bug 8385: Fix testMultipleRegistrationsAtOnePrefix failures
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index e314e1c894242d802bb242d034de86a403de52b0..bb289bc82e365177e55d320ffbbbe92d1cd8f404 100644 (file)
@@ -9,15 +9,18 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import static akka.pattern.Patterns.ask;
+
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.PoisonPill;
-import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
+import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
@@ -27,78 +30,87 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
-import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.SerializationUtils;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,16 +120,15 @@ import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * The ShardManager has the following jobs,
+ * Manages the shards for a data store. The ShardManager has the following jobs:
  * <ul>
  * <li> Create all the local shard replicas that belong on this cluster member
  * <li> Find the address of the local shard
  * <li> Find the primary replica for any given shard
  * <li> Monitor the cluster members and store their addresses
- * <ul>
+ * </ul>
  */
 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
     // Stores a mapping between a shard name and it's corresponding information
@@ -135,7 +146,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private final ShardManagerInfo mBean;
+    private final ShardManagerInfo shardManagerMBean;
 
     private DatastoreContextFactory datastoreContextFactory;
 
@@ -153,16 +164,22 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
+    private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
+
     private final String persistenceId;
+    private final AbstractDataStore dataStore;
+
+    private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
+    private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
     ShardManager(AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
         this.configuration = builder.getConfiguration();
-        this.datastoreContextFactory = builder.getDdatastoreContextFactory();
+        this.datastoreContextFactory = builder.getDatastoreContextFactory();
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -174,101 +191,145 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        List<String> localShardActorNames = new ArrayList<>();
-        mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+        shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
                 "shard-manager-" + this.type,
-                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
-                localShardActorNames);
-        mBean.setShardManager(this);
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+        shardManagerMBean.registerMBean();
+
+        dataStore = builder.getDistributedDataStore();
+    }
+
+    @Override
+    public void preStart() {
+        LOG.info("Starting ShardManager {}", persistenceId);
     }
 
     @Override
     public void postStop() {
         LOG.info("Stopping ShardManager {}", persistenceId());
 
-        mBean.unregisterMBean();
+        shardManagerMBean.unregisterMBean();
+
+        if (configListenerReg != null) {
+            configListenerReg.close();
+            configListenerReg = null;
+        }
     }
 
     @Override
     public void handleCommand(Object message) throws Exception {
         if (message  instanceof FindPrimary) {
             findPrimary((FindPrimary)message);
-        } else if(message instanceof FindLocalShard){
+        } else if (message instanceof FindLocalShard) {
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext(message);
-        } else if(message instanceof ActorInitialized) {
+        } else if (message instanceof ActorInitialized) {
             onActorInitialized(message);
-        } else if (message instanceof ClusterEvent.MemberUp){
+        } else if (message instanceof ClusterEvent.MemberUp) {
             memberUp((ClusterEvent.MemberUp) message);
-        } else if (message instanceof ClusterEvent.MemberExited){
+        } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
+            memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
+        } else if (message instanceof ClusterEvent.MemberExited) {
             memberExited((ClusterEvent.MemberExited) message);
-        } else if(message instanceof ClusterEvent.MemberRemoved) {
+        } else if (message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
-        } else if(message instanceof ClusterEvent.UnreachableMember) {
-            memberUnreachable((ClusterEvent.UnreachableMember)message);
-        } else if(message instanceof ClusterEvent.ReachableMember) {
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
+            memberUnreachable((ClusterEvent.UnreachableMember) message);
+        } else if (message instanceof ClusterEvent.ReachableMember) {
             memberReachable((ClusterEvent.ReachableMember) message);
-        } else if(message instanceof DatastoreContextFactory) {
-            onDatastoreContextFactory((DatastoreContextFactory)message);
-        } else if(message instanceof RoleChangeNotification) {
+        } else if (message instanceof DatastoreContextFactory) {
+            onDatastoreContextFactory((DatastoreContextFactory) message);
+        } else if (message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
-        } else if(message instanceof FollowerInitialSyncUpStatus){
+        } else if (message instanceof FollowerInitialSyncUpStatus) {
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
-        } else if(message instanceof ShardNotInitializedTimeout) {
-            onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
-        } else if(message instanceof ShardLeaderStateChanged) {
+        } else if (message instanceof ShardNotInitializedTimeout) {
+            onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
+        } else if (message instanceof ShardLeaderStateChanged) {
             onLeaderStateChanged((ShardLeaderStateChanged) message);
-        } else if(message instanceof SwitchShardBehavior){
+        } else if (message instanceof SwitchShardBehavior) {
             onSwitchShardBehavior((SwitchShardBehavior) message);
-        } else if(message instanceof CreateShard) {
+        } else if (message instanceof CreateShard) {
             onCreateShard((CreateShard)message);
-        } else if(message instanceof AddShardReplica){
-            onAddShardReplica((AddShardReplica)message);
-        } else if(message instanceof ForwardedAddServerReply) {
+        } else if (message instanceof AddShardReplica) {
+            onAddShardReplica((AddShardReplica) message);
+        } else if (message instanceof AddPrefixShardReplica) {
+            onAddPrefixShardReplica((AddPrefixShardReplica) message);
+        } else if (message instanceof PrefixShardCreated) {
+            onPrefixShardCreated((PrefixShardCreated) message);
+        } else if (message instanceof PrefixShardRemoved) {
+            onPrefixShardRemoved((PrefixShardRemoved) message);
+        } else if (message instanceof InitConfigListener) {
+            onInitConfigListener();
+        } else if (message instanceof ForwardedAddServerReply) {
             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
                     msg.removeShardOnFailure);
-        } else if(message instanceof ForwardedAddServerFailure) {
+        } else if (message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
-        } else if(message instanceof PrimaryShardFoundForContext) {
-            PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
-            onPrimaryShardFoundContext(primaryShardFoundContext);
-        } else if(message instanceof RemoveShardReplica) {
+        } else if (message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
-        } else if(message instanceof WrappedShardResponse){
+        } else if (message instanceof RemovePrefixShardReplica) {
+            onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
+        } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
-        } else if(message instanceof GetSnapshot) {
+        } else if (message instanceof GetSnapshot) {
             onGetSnapshot();
-        } else if(message instanceof ServerRemoved){
+        } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
-        } else if(message instanceof SaveSnapshotSuccess) {
-            onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
-        } else if(message instanceof SaveSnapshotFailure) {
-            LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
-                    persistenceId(), ((SaveSnapshotFailure) message).cause());
-        } else if(message instanceof Shutdown) {
+        } else if (message instanceof ChangeShardMembersVotingStatus) {
+            onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+        } else if (message instanceof FlipShardMembersVotingStatus) {
+            onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
+        } else if (message instanceof SaveSnapshotSuccess) {
+            onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+        } else if (message instanceof SaveSnapshotFailure) {
+            LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
+                    ((SaveSnapshotFailure) message).cause());
+        } else if (message instanceof Shutdown) {
             onShutDown();
+        } else if (message instanceof GetLocalShardIds) {
+            onGetLocalShardIds();
+        } else if (message instanceof RunnableMessage) {
+            ((RunnableMessage)message).run();
         } else {
             unknownMessage(message);
         }
     }
 
+    private void onInitConfigListener() {
+        LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
+
+        final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+                org.opendaylight.mdsal.common.api.LogicalDatastoreType
+                        .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+        if (configUpdateHandler != null) {
+            configUpdateHandler.close();
+        }
+
+        configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+        configUpdateHandler.initListener(dataStore, type);
+    }
+
     private void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
 
-                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
+                        .getElectionTimeOutInterval().$times(2);
                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
             }
         }
 
         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
 
-        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
+                .getDispatcher(Dispatchers.DispatcherType.Client);
         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
 
         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
@@ -278,17 +339,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 self().tell(PoisonPill.getInstance(), self());
 
-                if(failure != null) {
+                if (failure != null) {
                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
                 } else {
                     int nfailed = 0;
-                    for(Boolean r: results) {
-                        if(!r) {
+                    for (Boolean result : results) {
+                        if (!result) {
                             nfailed++;
                         }
                     }
 
-                    if(nfailed > 0) {
+                    if (nfailed > 0) {
                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
                     }
                 }
@@ -305,37 +366,66 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
             String leaderPath) {
-        shardReplicaOperationsInProgress.remove(shardId);
+        shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
-        LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+        LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
-            LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+            LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
                     shardId.getShardName());
-            originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+            originalSender.tell(new Status.Success(null), getSelf());
         } else {
-            LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+            LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
                     persistenceId(), shardId, replyMsg.getStatus());
 
-            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
-                    leaderPath, shardId);
-            originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
+            originalSender.tell(new Status.Failure(failure), getSelf());
         }
     }
 
-    private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
-        if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
-            addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
-                    getSender());
-        } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
-            removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
-                    primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+    private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+                                          final String primaryPath, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
         }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+        final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+        //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+        LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+                primaryPath, shardId);
+
+        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+        Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+                new RemoveServer(shardId.toString()), removeServerTimeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    shardReplicaOperationsInProgress.remove(shardName);
+                    String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+                            primaryPath, shardName);
+
+                    LOG.debug("{}: {}", persistenceId(), msg, failure);
+
+                    // FAILURE
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    // SUCCESS
+                    self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
             final ActorRef sender) {
-        if(isShardReplicaOperationInProgress(shardName, sender)) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
 
@@ -346,11 +436,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
 
         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
-        LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+        LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
                 primaryPath, shardId);
 
-        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
-                duration());
+        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
@@ -358,10 +447,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
+                    shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                             primaryPath, shardName);
 
-                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+                    LOG.debug("{}: {}", persistenceId(), msg, failure);
 
                     // FAILURE
                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
@@ -374,16 +464,41 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onShardReplicaRemoved(ServerRemoved message) {
-        final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
-        final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
-        if(shardInformation == null) {
+        removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void removeShard(final ShardIdentifier shardId) {
+        final String shardName = shardId.getShardName();
+        final ShardInformation shardInformation = localShards.remove(shardName);
+        if (shardInformation == null) {
             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
             return;
-        } else if(shardInformation.getActor() != null) {
-            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
-            shardInformation.getActor().tell(Shutdown.INSTANCE, self());
         }
-        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+
+        final ActorRef shardActor = shardInformation.getActor();
+        if (shardActor != null) {
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
+            FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
+                    .getElectionTimeOutInterval().$times(3);
+            final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
+            shardActorStoppingFutures.put(shardName, stopFuture);
+            stopFuture.onComplete(new OnComplete<Boolean>() {
+                @Override
+                public void onComplete(Throwable failure, Boolean result) {
+                    if (failure == null) {
+                        LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
+                    } else {
+                        LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
+                    }
+
+                    self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
+                            ActorRef.noSender());
+                }
+            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        }
+
+        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
         persistShardList();
     }
 
@@ -391,9 +506,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
-        for(ShardInformation shardInfo: localShards.values()) {
-            if(!shardInfo.isShardInitialized()) {
-                if(notInitialized == null) {
+        for (ShardInformation shardInfo : localShards.values()) {
+            if (!shardInfo.isShardInitialized()) {
+                if (notInitialized == null) {
                     notInitialized = new ArrayList<>();
                 }
 
@@ -401,57 +516,134 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
 
-        if(notInitialized != null) {
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+        if (notInitialized != null) {
+            getSender().tell(new Status.Failure(new IllegalStateException(String.format(
                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
             return;
         }
 
-        byte[] shardManagerSnapshot = null;
-        if(currentSnapshot != null) {
-            shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
-        }
-
         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
-                new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+                new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
-        for(ShardInformation shardInfo: localShards.values()) {
+        for (ShardInformation shardInfo: localShards.values()) {
             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void onCreateShard(CreateShard createShard) {
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
 
         Object reply;
         try {
             String shardName = createShard.getModuleShardConfig().getShardName();
-            if(localShards.containsKey(shardName)) {
+            if (localShards.containsKey(shardName)) {
                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
-                reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+                reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
             } else {
                 doCreateShard(createShard);
-                reply = new akka.actor.Status.Success(null);
+                reply = new Status.Success(null);
             }
         } catch (Exception e) {
             LOG.error("{}: onCreateShard failed", persistenceId(), e);
-            reply = new akka.actor.Status.Failure(e);
+            reply = new Status.Failure(e);
         }
 
-        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+        if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
             getSender().tell(reply, getSelf());
         }
     }
 
-    private void doCreateShard(CreateShard createShard) {
-        ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
-        String shardName = moduleShardConfig.getShardName();
+    private void onPrefixShardCreated(final PrefixShardCreated message) {
+        LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
+
+        final PrefixShardConfiguration config = message.getConfiguration();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
+        final String shardName = shardId.getShardName();
+
+        if (isPreviousShardActorStopInProgress(shardName, message)) {
+            return;
+        }
+
+        if (localShards.containsKey(shardName)) {
+            LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+            final PrefixShardConfiguration existing =
+                    configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
+
+            if (existing != null && existing.equals(config)) {
+                // we don't have to do nothing here
+                return;
+            }
+        }
+
+        doCreatePrefixShard(config, shardId, shardName);
+    }
+
+    private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
+        final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
+        if (stopFuture == null) {
+            return false;
+        }
+
+        LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
+                shardName, messageToDefer);
+        final ActorRef sender = getSender();
+        stopFuture.onComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(Throwable failure, Boolean result) {
+                LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+                self().tell(messageToDefer, sender);
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+
+        return true;
+    }
+
+    private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
+        configuration.addPrefixShardConfiguration(config);
+
+        final Builder builder = newShardDatastoreContextBuilder(shardName);
+        builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+                .storeRoot(config.getPrefix().getRootIdentifier());
+        DatastoreContext shardDatastoreContext = builder.build();
+
+        final Map<String, String> peerAddresses = getPeerAddresses(shardName);
+        final boolean isActiveMember = true;
+
+        LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+                persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
+
+        final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+                shardDatastoreContext, Shard.builder(), peerAddressResolver);
+        info.setActiveMember(isActiveMember);
+        localShards.put(info.getShardName(), info);
+
+        if (schemaContext != null) {
+            info.setActor(newShardActor(schemaContext, info));
+        }
+    }
+
+    private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+        LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+        final DOMDataTreeIdentifier prefix = message.getPrefix();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+        configuration.removePrefixShardConfiguration(prefix);
+        removeShard(shardId);
+    }
+
+    private void doCreateShard(final CreateShard createShard) {
+        final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+        final String shardName = moduleShardConfig.getShardName();
 
         configuration.addModuleShardConfiguration(moduleShardConfig);
 
         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
-        if(shardDatastoreContext == null) {
+        if (shardDatastoreContext == null) {
             shardDatastoreContext = newShardDatastoreContext(shardName);
         } else {
             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
@@ -460,13 +652,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-        boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
-                currentSnapshot.getShardList().contains(shardName);
+        boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+                && currentSnapshot.getShardList().contains(shardName);
 
         Map<String, String> peerAddresses;
         boolean isActiveMember;
-        if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
-                contains(cluster.getCurrentMemberName())) {
+        if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
+                .contains(cluster.getCurrentMemberName())) {
             peerAddresses = getPeerAddresses(shardName);
             isActiveMember = true;
         } else {
@@ -476,8 +668,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             // subsequent AddServer request will be needed to make it an active member.
             isActiveMember = false;
             peerAddresses = Collections.emptyMap();
-            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
-                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
+                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
         }
 
         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
@@ -489,23 +681,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         info.setActiveMember(isActiveMember);
         localShards.put(info.getShardName(), info);
 
-        mBean.addLocalShard(shardId.toString());
-
-        if(schemaContext != null) {
+        if (schemaContext != null) {
             info.setActor(newShardActor(schemaContext, info));
         }
     }
 
     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
-        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
-                shardPeerAddressResolver(peerAddressResolver);
+        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
+                .shardPeerAddressResolver(peerAddressResolver);
     }
 
     private DatastoreContext newShardDatastoreContext(String shardName) {
         return newShardDatastoreContextBuilder(shardName).build();
     }
 
-    private void checkReady(){
+    private void checkReady() {
         if (isReadyWithLeaderId()) {
             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
@@ -518,10 +708,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
-            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+            if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
                 primaryShardInfoCache.remove(shardInformation.getShardName());
             }
 
@@ -539,12 +729,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
-        if(!shardInfo.isShardInitialized()) {
+        if (!shardInfo.isShardInitialized()) {
             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
-            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+            message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
         } else {
             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
-            message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+            message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
         }
     }
 
@@ -554,10 +744,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardInformation shardInformation = findShardInformation(status.getName());
 
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
 
-            mBean.setSyncStatus(isInSync());
+            shardManagerMBean.setSyncStatus(isInSync());
         }
 
     }
@@ -567,17 +757,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
             checkReady();
-            mBean.setSyncStatus(isInSync());
+            shardManagerMBean.setSyncStatus(isInSync());
         }
     }
 
 
     private ShardInformation findShardInformation(String memberId) {
-        for(ShardInformation info : localShards.values()){
-            if(info.getShardId().toString().equals(memberId)){
+        for (ShardInformation info : localShards.values()) {
+            if (info.getShardId().toString().equals(memberId)) {
                 return info;
             }
         }
@@ -588,7 +778,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private boolean isReadyWithLeaderId() {
         boolean isReady = true;
         for (ShardInformation info : localShards.values()) {
-            if(!info.isShardReadyWithLeaderId()){
+            if (!info.isShardReadyWithLeaderId()) {
                 isReady = false;
                 break;
             }
@@ -596,9 +786,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return isReady;
     }
 
-    private boolean isInSync(){
+    private boolean isInSync() {
         for (ShardInformation info : localShards.values()) {
-            if(!info.isInSync()){
+            if (!info.isInSync()) {
                 return false;
             }
         }
@@ -614,9 +804,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         String actorName = sender.path().name();
         //find shard name from actor name; actor name is stringified shardId
-        ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
 
-        if (shardId.getShardName() == null) {
+        final ShardIdentifier shardId;
+        try {
+            shardId = ShardIdentifier.fromShardIdString(actorName);
+        } catch (IllegalArgumentException e) {
+            LOG.debug("{}: ignoring actor {}", actorName, e);
             return;
         }
 
@@ -643,6 +836,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void onRecoveryCompleted() {
         LOG.info("Recovery complete : {}", persistenceId());
 
@@ -650,60 +844,35 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // journal on upgrade from Helium.
         deleteMessages(lastSequenceNr());
 
-        if(currentSnapshot == null && restoreFromSnapshot != null &&
-                restoreFromSnapshot.getShardManagerSnapshot() != null) {
-            try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
-                    restoreFromSnapshot.getShardManagerSnapshot()))) {
-                ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+        if (currentSnapshot == null && restoreFromSnapshot != null
+                && restoreFromSnapshot.getShardManagerSnapshot() != null) {
+            ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
 
-                LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+            LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
 
-                applyShardManagerSnapshot(snapshot);
-            } catch(Exception e) {
-                LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
-            }
+            applyShardManagerSnapshot(snapshot);
         }
 
         createLocalShards();
     }
 
-    private void findLocalShard(FindLocalShard message) {
-        final ShardInformation shardInformation = localShards.get(message.getShardName());
-
-        if(shardInformation == null){
-            getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
-            return;
-        }
-
-        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
-            @Override
-            public Object get() {
-                return new LocalShardFound(shardInformation.getActor());
-            }
-        });
-    }
-
     private void sendResponse(ShardInformation shardInformation, boolean doWait,
             boolean wantShardReady, final Supplier<Object> messageSupplier) {
-        if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
-            if(doWait) {
+        if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
+            if (doWait) {
                 final ActorRef sender = getSender();
                 final ActorRef self = self();
 
-                Runnable replyRunnable = new Runnable() {
-                    @Override
-                    public void run() {
-                        sender.tell(messageSupplier.get(), self);
-                    }
-                };
+                Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
 
                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
                     new OnShardInitialized(replyRunnable);
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
-                FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
-                if(shardInformation.isShardInitialized()) {
+                FiniteDuration timeout = shardInformation.getDatastoreContext()
+                        .getShardInitializationTimeout().duration();
+                if (shardInformation.isShardInitialized()) {
                     // If the shard is already initialized then we'll wait enough time for the shard to
                     // elect a leader, ie 2 times the election timeout.
                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
@@ -723,11 +892,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             } else if (!shardInformation.isShardInitialized()) {
                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
                         shardInformation.getShardName());
-                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
+                getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
             } else {
                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
                         shardInformation.getShardName());
-                getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
+                getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
             }
 
             return;
@@ -745,47 +914,64 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
 
+    @VisibleForTesting
+    static MemberName memberToName(final Member member) {
+        return MemberName.forName(member.roles().iterator().next());
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
     private void memberExited(ClusterEvent.MemberExited message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        addPeerAddress(memberName, message.member().address());
+        memberUp(memberName, message.member().address());
+    }
 
+    private void memberUp(MemberName memberName, Address address) {
+        addPeerAddress(memberName, address);
         checkReady();
     }
 
-    private void addPeerAddress(String memberName, Address address) {
+    private void memberWeaklyUp(MemberWeaklyUp message) {
+        MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberUp(memberName, message.member().address());
+    }
+
+    private void addPeerAddress(MemberName memberName, Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             String shardName = info.getShardName();
             String peerId = getShardIdentifier(memberName, shardName).toString();
             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
@@ -795,8 +981,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
-        String memberName = message.member().roles().iterator().next();
-        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+        MemberName memberName = memberToName(message.member());
+        LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
 
@@ -804,16 +990,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
-        String memberName = message.member().roles().iterator().next();
-        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+        MemberName memberName = memberToName(message.member());
+        LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
     }
 
-    private void markMemberUnavailable(final String memberName) {
-        for(ShardInformation info : localShards.values()){
+    private void markMemberUnavailable(final MemberName memberName) {
+        final String memberStr = memberName.getName();
+        for (ShardInformation info : localShards.values()) {
             String leaderId = info.getLeaderId();
-            if(leaderId != null && leaderId.contains(memberName)) {
+            // XXX: why are we using String#contains() here?
+            if (leaderId != null && leaderId.contains(memberStr)) {
                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
                 info.setLeaderAvailable(false);
 
@@ -824,10 +1012,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void markMemberAvailable(final String memberName) {
-        for(ShardInformation info : localShards.values()){
+    private void markMemberAvailable(final MemberName memberName) {
+        final String memberStr = memberName.getName();
+        for (ShardInformation info : localShards.values()) {
             String leaderId = info.getLeaderId();
-            if(leaderId != null && leaderId.contains(memberName)) {
+            // XXX: why are we using String#contains() here?
+            if (leaderId != null && leaderId.contains(memberStr)) {
                 LOG.debug("Marking Leader {} as available.", leaderId);
                 info.setLeaderAvailable(true);
             }
@@ -843,24 +1033,51 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onSwitchShardBehavior(SwitchShardBehavior message) {
-        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+    private void onGetLocalShardIds() {
+        final List<String> response = new ArrayList<>(localShards.size());
 
-        ShardInformation shardInformation = localShards.get(identifier.getShardName());
+        for (ShardInformation info : localShards.values()) {
+            response.add(info.getShardId().toString());
+        }
+
+        getSender().tell(new Status.Success(response), getSelf());
+    }
+
+    private void onSwitchShardBehavior(final SwitchShardBehavior message) {
+        final ShardIdentifier identifier = message.getShardId();
+
+        if (identifier != null) {
+            final ShardInformation info = localShards.get(identifier.getShardName());
+            if (info == null) {
+                getSender().tell(new Status.Failure(
+                    new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
+                return;
+            }
 
-        if(shardInformation != null && shardInformation.getActor() != null) {
-            shardInformation.getActor().tell(
-                    new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
+            switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
+        } else {
+            for (ShardInformation info : localShards.values()) {
+                switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
+            }
+        }
+
+        getSender().tell(new Status.Success(null), getSelf());
+    }
+
+    private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
+        final ActorRef actor = info.getActor();
+        if (actor != null) {
+            actor.tell(switchBehavior, getSelf());
         } else {
             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
-                    message.getShardName(), message.getNewState());
+                info.getShardName(), switchBehavior.getNewState());
         }
     }
 
     /**
-     * Notifies all the local shards of a change in the schema context
+     * Notifies all the local shards of a change in the schema context.
      *
-     * @param message
+     * @param message the message to send
      */
     private void updateSchemaContext(final Object message) {
         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
@@ -883,9 +1100,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
-        return getContext().actorOf(info.newProps(schemaContext)
-                .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+    protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
+        return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
+                info.getShardId().toString());
     }
 
     private void findPrimary(FindPrimary message) {
@@ -897,27 +1114,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null && info.isActiveMember()) {
-            sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
-                @Override
-                public Object get() {
-                    String primaryPath = info.getSerializedLeaderActor();
-                    Object found = canReturnLocalShardState && info.isLeader() ?
-                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
-                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
-
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
-                            }
-
-                            return found;
-                }
+            sendResponse(info, message.isWaitUntilReady(), true, () -> {
+                String primaryPath = info.getSerializedLeaderActor();
+                Object found = canReturnLocalShardState && info.isLeader()
+                        ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                            new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                return found;
             });
 
             return;
         }
 
         final Collection<String> visitedAddresses;
-        if(message instanceof RemoteFindPrimary) {
+        if (message instanceof RemoteFindPrimary) {
             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
         } else {
             visitedAddresses = new ArrayList<>(1);
@@ -925,8 +1136,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
 
-        for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
-            if(visitedAddresses.contains(address)) {
+        for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+            if (visitedAddresses.contains(address)) {
                 continue;
             }
 
@@ -944,38 +1155,58 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 String.format("No primary shard found for %s.", shardName)), getSelf());
     }
 
+    private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+                .getShardInitializationTimeout().duration().$times(2));
+
+        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    handler.onFailure(failure);
+                } else {
+                    if (response instanceof RemotePrimaryShardFound) {
+                        handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+                    } else if (response instanceof LocalPrimaryShardFound) {
+                        handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+                    } else {
+                        handler.onUnknownResponse(response);
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     /**
      * Construct the name of the shard actor given the name of the member on
-     * which the shard resides and the name of the shard
+     * which the shard resides and the name of the shard.
      *
-     * @param memberName
-     * @param shardName
-     * @return
+     * @param memberName the member name
+     * @param shardName the shard name
+     * @return a b
      */
-    private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+    private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
         return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
     /**
-     * Create shards that are local to the member on which the ShardManager
-     * runs
-     *
+     * Create shards that are local to the member on which the ShardManager runs.
      */
     private void createLocalShards() {
-        String memberName = this.cluster.getCurrentMemberName();
+        MemberName memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
-        if(restoreFromSnapshot != null)
-        {
-            for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+        if (restoreFromSnapshot != null) {
+            for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
                 shardSnapshots.put(snapshot.getName(), snapshot);
             }
         }
 
         restoreFromSnapshot = null; // null out to GC
 
-        for(String shardName : memberShardNames){
+        for (String shardName : memberShardNames) {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
 
             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
@@ -984,23 +1215,25 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
                         shardSnapshots.get(shardName)), peerAddressResolver));
-            mBean.addLocalShard(shardId.toString());
         }
     }
 
     /**
-     * Given the name of the shard find the addresses of all it's peers
+     * Given the name of the shard find the addresses of all it's peers.
      *
-     * @param shardName
+     * @param shardName the shard name
      */
     private Map<String, String> getPeerAddresses(String shardName) {
-        Collection<String> members = configuration.getMembersFromShardName(shardName);
-        Map<String, String> peerAddresses = new HashMap<>();
+        final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+        return getPeerAddresses(shardName, members);
+    }
 
-        String currentMemberName = this.cluster.getCurrentMemberName();
+    private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+        Map<String, String> peerAddresses = new HashMap<>();
+        MemberName currentMemberName = this.cluster.getCurrentMemberName();
 
-        for(String memberName : members) {
-            if(!currentMemberName.equals(memberName)) {
+        for (MemberName memberName : members) {
+            if (!currentMemberName.equals(memberName)) {
                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
                 peerAddresses.put(shardId.toString(), address);
@@ -1013,15 +1246,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public SupervisorStrategy supervisorStrategy() {
 
         return new OneForOneStrategy(10, Duration.create("1 minute"),
-                new Function<Throwable, SupervisorStrategy.Directive>() {
-            @Override
-            public SupervisorStrategy.Directive apply(Throwable t) {
-                LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
-                return SupervisorStrategy.resume();
-            }
-        }
-                );
-
+                (Function<Throwable, Directive>) t -> {
+                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+                    return SupervisorStrategy.resume();
+                });
     }
 
     @Override
@@ -1030,31 +1258,65 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    ShardManagerInfoMBean getMBean(){
-        return mBean;
+    ShardManagerInfoMBean getMBean() {
+        return shardManagerMBean;
     }
 
     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
         if (shardReplicaOperationsInProgress.contains(shardName)) {
             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            LOG.debug("{}: {}", persistenceId(), msg);
+            sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
             return true;
         }
 
         return false;
     }
 
-    private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
+    private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
+        LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
+
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(message.getShardPrefix()));
+        final String shardName = shardId.getShardName();
+
+        // Create the localShard
+        if (schemaContext == null) {
+            String msg = String.format(
+                    "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+            LOG.debug("{}: {}", persistenceId(), msg);
+            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+            return;
+        }
+
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+                getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+                final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
+                        message.getShardPrefix(), response, getSender());
+                if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+                    getSelf().tell(runnable, getTargetActor());
+                }
+            }
+
+            @Override
+            public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+                sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+            }
+        });
+    }
+
+    private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         // verify the shard with the specified name is present in the cluster configuration
-        if (!(this.configuration.isShardConfigured(shardName))) {
+        if (!this.configuration.isShardConfigured(shardName)) {
             String msg = String.format("No module configuration exists for shard %s", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            LOG.debug("{}: {}", persistenceId(), msg);
+            getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
             return;
         }
 
@@ -1062,33 +1324,70 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (schemaContext == null) {
             String msg = String.format(
                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            LOG.debug("{}: {}", persistenceId(), msg);
+            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
             return;
         }
 
-        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+                getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                final RunnableMessage runnable = (RunnableMessage) () ->
+                    addShard(getShardName(), response, getSender());
+                if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+                    getSelf().tell(runnable, getTargetActor());
+                }
             }
 
             @Override
             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
-
         });
     }
 
     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
         String msg = String.format("Local shard %s already exists", shardName);
-        LOG.debug ("{}: {}", persistenceId(), msg);
-        sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+        LOG.debug("{}: {}", persistenceId(), msg);
+        sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
+    }
+
+    private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
+                                final RemotePrimaryShardFound response, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardInformation shardInfo;
+        final boolean removeShardOnFailure;
+        ShardInformation existingShardInfo = localShards.get(shardName);
+        if (existingShardInfo == null) {
+            removeShardOnFailure = true;
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+            final Builder builder = newShardDatastoreContextBuilder(shardName);
+            builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+            DatastoreContext datastoreContext = builder.build();
+
+            shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+                    Shard.builder(), peerAddressResolver);
+            shardInfo.setActiveMember(false);
+            localShards.put(shardName, shardInfo);
+            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+        } else {
+            removeShardOnFailure = false;
+            shardInfo = existingShardInfo;
+        }
+
+        execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
     }
 
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
-        if(isShardReplicaOperationInProgress(shardName, sender)) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
 
@@ -1097,12 +1396,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ShardInformation shardInfo;
         final boolean removeShardOnFailure;
         ShardInformation existingShardInfo = localShards.get(shardName);
-        if(existingShardInfo == null) {
+        if (existingShardInfo == null) {
             removeShardOnFailure = true;
             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
-                    DisableElectionsRaftPolicy.class.getName()).build();
+            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
+                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
 
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
@@ -1114,25 +1413,35 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInfo = existingShardInfo;
         }
 
-        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+        execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+    }
+
+    private void execAddShard(final String shardName,
+                              final ShardInformation shardInfo,
+                              final RemotePrimaryShardFound response,
+                              final boolean removeShardOnFailure,
+                              final ActorRef sender) {
+
+        final String localShardAddress =
+                peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
 
         //inform ShardLeader to add this shard as a replica by sending an AddServer message
-        LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+        LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
                 response.getPrimaryPath(), shardInfo.getShardId());
 
-        Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
-                duration());
-        Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
-            new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+        final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+                .getShardLeaderElectionTimeout().duration());
+        final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+                new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
                 if (failure != null) {
-                    LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+                    LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
-                    String msg = String.format("AddServer request to leader %s for shard %s failed",
+                    final String msg = String.format("AddServer request to leader %s for shard %s failed",
                             response.getPrimaryPath(), shardName);
                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
                 } else {
@@ -1147,14 +1456,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             boolean removeShardOnFailure) {
         shardReplicaOperationsInProgress.remove(shardName);
 
-        if(removeShardOnFailure) {
+        if (removeShardOnFailure) {
             ShardInformation shardInfo = localShards.remove(shardName);
             if (shardInfo.getActor() != null) {
                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
             }
         }
 
-        sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+        sender.tell(new Status.Failure(message == null ? failure :
             new RuntimeException(message, failure)), getSelf());
     }
 
@@ -1163,25 +1472,25 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         String shardName = shardInfo.getShardName();
         shardReplicaOperationsInProgress.remove(shardName);
 
-        LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+        LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
-            LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+            LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
             shardInfo.setActiveMember(true);
             persistShardList();
 
-            mBean.addLocalShard(shardInfo.getShardId().toString());
-            sender.tell(new akka.actor.Status.Success(null), getSelf());
-        } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+            sender.tell(new Status.Success(null), getSelf());
+        } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
             sendLocalReplicaAlreadyExistsReply(shardName, sender);
         } else {
-            LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+            LOG.warn("{}: Leader failed to add shard replica {} with status {}",
                     persistenceId(), shardName, replyMsg.getStatus());
 
-            Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
+            Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
+                    shardInfo.getShardId());
 
             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
         }
@@ -1193,9 +1502,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         switch (serverChangeStatus) {
             case TIMEOUT:
                 failure = new TimeoutException(String.format(
-                        "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
-                        "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
-                        leaderPath, shardId.getShardName()));
+                        "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
+                        + "Possible causes - there was a problem replicating the data or shard leadership changed "
+                        + "while replicating the shard data", leaderPath, shardId.getShardName()));
                 break;
             case NO_LEADER:
                 failure = createNoShardLeaderException(shardId);
@@ -1212,19 +1521,50 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return failure;
     }
 
-    private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+    private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
+                        primaryPath, getSender()), getTargetActor());
+            }
+        });
+    }
+
+    private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
+        LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
+
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(message.getShardPrefix()));
+        final String shardName = shardId.getShardName();
+
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
+                shardName, persistenceId(), getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            @Override
+            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
+                        primaryPath, getSender()), getTargetActor());
             }
         });
     }
@@ -1236,27 +1576,29 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 shardList.remove(shardInfo.getShardName());
             }
         }
-        LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
-        saveSnapshot(updateShardManagerSnapshot(shardList));
+        LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
+        saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
     }
 
-    private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
-        currentSnapshot = new ShardManagerSnapshot(shardList);
+    private ShardManagerSnapshot updateShardManagerSnapshot(
+            final List<String> shardList,
+            final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+        currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
         return currentSnapshot;
     }
 
     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
         currentSnapshot = snapshot;
 
-        LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+        LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
 
-        String currentMember = cluster.getCurrentMemberName();
+        final MemberName currentMember = cluster.getCurrentMemberName();
         Set<String> configuredShardList =
             new HashSet<>(configuration.getMemberShardNames(currentMember));
         for (String shard : currentSnapshot.getShardList()) {
             if (!configuredShardList.contains(shard)) {
                 // add the current member as a replica for the shard
-                LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+                LOG.debug("{}: adding shard {}", persistenceId(), shard);
                 configuration.addMemberReplicaForShard(shard, currentMember);
             } else {
                 configuredShardList.remove(shard);
@@ -1264,280 +1606,202 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
         for (String shard : configuredShardList) {
             // remove the member as a replica for the shard
-            LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+            LOG.debug("{}: removing shard {}", persistenceId(), shard);
             configuration.removeMemberReplicaForShard(shard, currentMember);
         }
     }
 
-    private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
-        LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+    private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
+        LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
             0, 0));
     }
 
-    private static class ForwardedAddServerReply {
-        ShardInformation shardInfo;
-        AddServerReply addServerReply;
-        String leaderPath;
-        boolean removeShardOnFailure;
+    private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+        LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
 
-        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
-                boolean removeShardOnFailure) {
-            this.shardInfo = shardInfo;
-            this.addServerReply = addServerReply;
-            this.leaderPath = leaderPath;
-            this.removeShardOnFailure = removeShardOnFailure;
+        String shardName = changeMembersVotingStatus.getShardName();
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+            serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+                    e.getValue());
         }
-    }
 
-    private static class ForwardedAddServerFailure {
-        String shardName;
-        String failureMessage;
-        Throwable failure;
-        boolean removeShardOnFailure;
+        ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
 
-        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
-                boolean removeShardOnFailure) {
-            this.shardName = shardName;
-            this.failureMessage = failureMessage;
-            this.failure = failure;
-            this.removeShardOnFailure = removeShardOnFailure;
-        }
+        findLocalShard(shardName, getSender(),
+            localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+            localShardFound.getPath(), getSender()));
     }
 
-    @VisibleForTesting
-    protected static class ShardInformation {
-        private final ShardIdentifier shardId;
-        private final String shardName;
-        private ActorRef actor;
-        private final Map<String, String> initialPeerAddresses;
-        private Optional<DataTree> localShardDataTree;
-        private boolean leaderAvailable = false;
-
-        // flag that determines if the actor is ready for business
-        private boolean actorInitialized = false;
-
-        private boolean followerSyncStatus = false;
-
-        private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
-        private String role ;
-        private String leaderId;
-        private short leaderVersion;
-
-        private DatastoreContext datastoreContext;
-        private Shard.AbstractBuilder<?, ?> builder;
-        private final ShardPeerAddressResolver addressResolver;
-        private boolean isActiveMember = true;
-
-        private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
-                Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
-            this.shardName = shardName;
-            this.shardId = shardId;
-            this.initialPeerAddresses = initialPeerAddresses;
-            this.datastoreContext = datastoreContext;
-            this.builder = builder;
-            this.addressResolver = addressResolver;
-        }
-
-        Props newProps(SchemaContext schemaContext) {
-            Preconditions.checkNotNull(builder);
-            Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
-                    schemaContext(schemaContext).props();
-            builder = null;
-            return props;
-        }
-
-        String getShardName() {
-            return shardName;
-        }
-
-        @Nullable
-        ActorRef getActor(){
-            return actor;
-        }
-
-        void setActor(ActorRef actor) {
-            this.actor = actor;
-        }
+    private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+        LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
 
-        ShardIdentifier getShardId() {
-            return shardId;
-        }
+        ActorRef sender = getSender();
+        final String shardName = flipMembersVotingStatus.getShardName();
+        findLocalShard(shardName, sender, localShardFound -> {
+            Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+                    Timeout.apply(30, TimeUnit.SECONDS));
 
-        void setLocalDataTree(Optional<DataTree> localShardDataTree) {
-            this.localShardDataTree = localShardDataTree;
-        }
-
-        Optional<DataTree> getLocalShardDataTree() {
-            return localShardDataTree;
-        }
-
-        DatastoreContext getDatastoreContext() {
-            return datastoreContext;
-        }
+            future.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable failure, Object response) {
+                    if (failure != null) {
+                        sender.tell(new Status.Failure(new RuntimeException(
+                                String.format("Failed to access local shard %s", shardName), failure)), self());
+                        return;
+                    }
 
-        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
-            this.datastoreContext = datastoreContext;
-            if (actor != null) {
-                LOG.debug ("Sending new DatastoreContext to {}", shardId);
-                actor.tell(this.datastoreContext, sender);
-            }
-        }
+                    OnDemandRaftState raftState = (OnDemandRaftState) response;
+                    Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+                    for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                        serverVotingStatusMap.put(e.getKey(), !e.getValue());
+                    }
 
-        void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+                    serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
+                            .toString(), !raftState.isVoting());
 
-            if(actor != null) {
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
-                            peerId, peerAddress, actor.path());
+                    changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+                            shardName, localShardFound.getPath(), sender);
                 }
+            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        });
 
-                actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
-            }
-
-            notifyOnShardInitializedCallbacks();
-        }
-
-        void peerDown(String memberName, String peerId, ActorRef sender) {
-            if(actor != null) {
-                actor.tell(new PeerDown(memberName, peerId), sender);
-            }
-        }
-
-        void peerUp(String memberName, String peerId, ActorRef sender) {
-            if(actor != null) {
-                actor.tell(new PeerUp(memberName, peerId), sender);
-            }
-        }
-
-        boolean isShardReady() {
-            return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
-        }
-
-        boolean isShardReadyWithLeaderId() {
-            return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
-                    (isLeader() || addressResolver.resolve(leaderId) != null);
-        }
-
-        boolean isShardInitialized() {
-            return getActor() != null && actorInitialized;
-        }
-
-        boolean isLeader() {
-            return Objects.equal(leaderId, shardId.toString());
-        }
+    }
 
-        String getSerializedLeaderActor() {
-            if(isLeader()) {
-                return Serialization.serializedActorPath(getActor());
-            } else {
-                return addressResolver.resolve(leaderId);
-            }
-        }
+    private void findLocalShard(FindLocalShard message) {
+        LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
 
-        void setActorInitialized() {
-            LOG.debug("Shard {} is initialized", shardId);
+        final ShardInformation shardInformation = localShards.get(message.getShardName());
 
-            this.actorInitialized = true;
+        if (shardInformation == null) {
+            LOG.debug("{}: Local shard {} not found - shards present: {}",
+                    persistenceId(), message.getShardName(), localShards.keySet());
 
-            notifyOnShardInitializedCallbacks();
+            getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+            return;
         }
 
-        private void notifyOnShardInitializedCallbacks() {
-            if(onShardInitializedSet.isEmpty()) {
-                return;
-            }
-
-            boolean ready = isShardReadyWithLeaderId();
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
+            () -> new LocalShardFound(shardInformation.getActor()));
+    }
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
-                        ready ? "ready" : "initialized", onShardInitializedSet.size());
-            }
+    private void findLocalShard(final String shardName, final ActorRef sender,
+            final Consumer<LocalShardFound> onLocalShardFound) {
+        Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+                .getShardInitializationTimeout().duration().$times(2));
 
-            Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
-            while(iter.hasNext()) {
-                OnShardInitialized onShardInitialized = iter.next();
-                if(!(onShardInitialized instanceof OnShardReady) || ready) {
-                    iter.remove();
-                    onShardInitialized.getTimeoutSchedule().cancel();
-                    onShardInitialized.getReplyRunnable().run();
+        Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
+                            failure);
+                    sender.tell(new Status.Failure(new RuntimeException(
+                            String.format("Failed to find local shard %s", shardName), failure)), self());
+                } else {
+                    if (response instanceof LocalShardFound) {
+                        getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
+                                sender);
+                    } else if (response instanceof LocalShardNotFound) {
+                        String msg = String.format("Local shard %s does not exist", shardName);
+                        LOG.debug("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+                    } else {
+                        String msg = String.format("Failed to find local shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug("{}: {}", persistenceId, msg);
+                        sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+                                new RuntimeException(msg)), self());
+                    }
                 }
             }
-        }
-
-        void addOnShardInitialized(OnShardInitialized onShardInitialized) {
-            onShardInitializedSet.add(onShardInitialized);
-        }
-
-        void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
-            onShardInitializedSet.remove(onShardInitialized);
-        }
-
-        void setRole(String newRole) {
-            this.role = newRole;
-
-            notifyOnShardInitializedCallbacks();
-        }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
 
-        void setFollowerSyncStatus(boolean syncStatus){
-            this.followerSyncStatus = syncStatus;
+    private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+            final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
         }
 
-        boolean isInSync(){
-            if(RaftState.Follower.name().equals(this.role)){
-                return followerSyncStatus;
-            } else if(RaftState.Leader.name().equals(this.role)){
-                return true;
-            }
-
-            return false;
-        }
+        shardReplicaOperationsInProgress.add(shardName);
 
-        boolean setLeaderId(String leaderId) {
-            boolean changed = !Objects.equal(this.leaderId, leaderId);
-            this.leaderId = leaderId;
-            if(leaderId != null) {
-                this.leaderAvailable = true;
-            }
-            notifyOnShardInitializedCallbacks();
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-            return changed;
-        }
+        LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+                changeServersVotingStatus, shardActorRef.path());
 
-        String getLeaderId() {
-            return leaderId;
-        }
+        Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+        Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
 
-        void setLeaderAvailable(boolean leaderAvailable) {
-            this.leaderAvailable = leaderAvailable;
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                shardReplicaOperationsInProgress.remove(shardName);
+                if (failure != null) {
+                    String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+                            shardActorRef.path());
+                    LOG.debug("{}: {}", persistenceId(), msg, failure);
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+                    ServerChangeReply replyMsg = (ServerChangeReply) response;
+                    if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+                        LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+                        sender.tell(new Status.Success(null), getSelf());
+                    } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+                        sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+                                "The requested voting state change for shard %s is invalid. At least one member "
+                                + "must be voting", shardId.getShardName()))), getSelf());
+                    } else {
+                        LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+                                persistenceId(), shardName, replyMsg.getStatus());
 
-            if(leaderAvailable) {
-                notifyOnShardInitializedCallbacks();
+                        Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+                                replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+                        sender.tell(new Status.Failure(error), getSelf());
+                    }
+                }
             }
-        }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
 
-        short getLeaderVersion() {
-            return leaderVersion;
-        }
+    private static final class ForwardedAddServerReply {
+        ShardInformation shardInfo;
+        AddServerReply addServerReply;
+        String leaderPath;
+        boolean removeShardOnFailure;
 
-        void setLeaderVersion(short leaderVersion) {
-            this.leaderVersion = leaderVersion;
+        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+                boolean removeShardOnFailure) {
+            this.shardInfo = shardInfo;
+            this.addServerReply = addServerReply;
+            this.leaderPath = leaderPath;
+            this.removeShardOnFailure = removeShardOnFailure;
         }
+    }
 
-        boolean isActiveMember() {
-            return isActiveMember;
-        }
+    private static final class ForwardedAddServerFailure {
+        String shardName;
+        String failureMessage;
+        Throwable failure;
+        boolean removeShardOnFailure;
 
-        void setActiveMember(boolean isActiveMember) {
-            this.isActiveMember = isActiveMember;
+        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+                boolean removeShardOnFailure) {
+            this.shardName = shardName;
+            this.failureMessage = failureMessage;
+            this.failure = failure;
+            this.removeShardOnFailure = removeShardOnFailure;
         }
     }
 
-    private static class OnShardInitialized {
+    static class OnShardInitialized {
         private final Runnable replyRunnable;
         private Cancellable timeoutSchedule;
 
@@ -1558,110 +1822,69 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private static class OnShardReady extends OnShardInitialized {
+    static class OnShardReady extends OnShardInitialized {
         OnShardReady(Runnable replyRunnable) {
             super(replyRunnable);
         }
     }
 
-    private static class ShardNotInitializedTimeout {
-        private final ActorRef sender;
-        private final ShardInformation shardInfo;
-        private final OnShardInitialized onShardInitialized;
-
-        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
-            this.sender = sender;
-            this.shardInfo = shardInfo;
-            this.onShardInitialized = onShardInitialized;
-        }
-
-        ActorRef getSender() {
-            return sender;
-        }
-
-        ShardInformation getShardInfo() {
-            return shardInfo;
-        }
-
-        OnShardInitialized getOnShardInitialized() {
-            return onShardInitialized;
-        }
-    }
-
-    private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
-        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
-                getShardInitializationTimeout().duration().$times(2));
-
-
-        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(Throwable failure, Object response) {
-                if (failure != null) {
-                    handler.onFailure(failure);
-                } else {
-                    if(response instanceof RemotePrimaryShardFound) {
-                        handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
-                    } else if(response instanceof LocalPrimaryShardFound) {
-                        handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
-                    } else {
-                        handler.onUnknownResponse(response);
-                    }
-                }
-            }
-        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    private interface RunnableMessage extends Runnable {
     }
 
     /**
      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
-     * a remote or local find primary message is processed
+     * a remote or local find primary message is processed.
      */
-    private static interface FindPrimaryResponseHandler {
+    private interface FindPrimaryResponseHandler {
         /**
-         * Invoked when a Failure message is received as a response
+         * Invoked when a Failure message is received as a response.
          *
-         * @param failure
+         * @param failure the failure exception
          */
         void onFailure(Throwable failure);
 
         /**
-         * Invoked when a RemotePrimaryShardFound response is received
+         * Invoked when a RemotePrimaryShardFound response is received.
          *
-         * @param response
+         * @param response the response
          */
         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
 
         /**
-         * Invoked when a LocalPrimaryShardFound response is received
-         * @param response
+         * Invoked when a LocalPrimaryShardFound response is received.
+         *
+         * @param response the response
          */
         void onLocalPrimaryFound(LocalPrimaryShardFound response);
 
         /**
          * Invoked when an unknown response is received. This is another type of failure.
          *
-         * @param response
+         * @param response the response
          */
         void onUnknownResponse(Object response);
     }
 
     /**
      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
-     * replica and sends a wrapped Failure response to some targetActor
+     * replica and sends a wrapped Failure response to some targetActor.
      */
-    private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+    private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
         private final ActorRef targetActor;
         private final String shardName;
         private final String persistenceId;
         private final ActorRef shardManagerActor;
 
         /**
+         * Constructs an instance.
+         *
          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
          * @param shardName The name of the shard for which the primary replica had to be found
          * @param persistenceId The persistenceId for the ShardManager
          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
          */
-        protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+        protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
+                ActorRef shardManagerActor) {
             this.targetActor = Preconditions.checkNotNull(targetActor);
             this.shardName = Preconditions.checkNotNull(shardName);
             this.persistenceId = Preconditions.checkNotNull(persistenceId);
@@ -1678,8 +1901,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         @Override
         public void onFailure(Throwable failure) {
-            LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
-            targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+            LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+            targetActor.tell(new Status.Failure(new RuntimeException(
                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
         }
 
@@ -1687,68 +1910,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         public void onUnknownResponse(Object response) {
             String msg = String.format("Failed to find leader for shard %s: received response: %s",
                     shardName, response);
-            LOG.debug ("{}: {}", persistenceId, msg);
-            targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+            LOG.debug("{}: {}", persistenceId, msg);
+            targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
                     new RuntimeException(msg)), shardManagerActor);
         }
     }
 
-
-    /**
-     * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
-     * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
-     * as a successful response to find primary.
-     */
-    private static class PrimaryShardFoundForContext {
-        private final String shardName;
-        private final Object contextMessage;
-        private final RemotePrimaryShardFound remotePrimaryShardFound;
-        private final LocalPrimaryShardFound localPrimaryShardFound;
-
-        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
-                @Nonnull Object primaryFoundMessage) {
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.contextMessage = Preconditions.checkNotNull(contextMessage);
-            Preconditions.checkNotNull(primaryFoundMessage);
-            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
-                    (RemotePrimaryShardFound) primaryFoundMessage : null;
-            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
-                    (LocalPrimaryShardFound) primaryFoundMessage : null;
-        }
-
-        @Nonnull
-        String getPrimaryPath(){
-            if(remotePrimaryShardFound != null) {
-                return remotePrimaryShardFound.getPrimaryPath();
-            }
-            return localPrimaryShardFound.getPrimaryPath();
-        }
-
-        @Nonnull
-        Object getContextMessage() {
-            return contextMessage;
-        }
-
-        @Nullable
-        RemotePrimaryShardFound getRemotePrimaryShardFound() {
-            return remotePrimaryShardFound;
-        }
-
-        @Nonnull
-        String getShardName() {
-            return shardName;
-        }
-    }
-
     /**
      * The WrappedShardResponse class wraps a response from a Shard.
      */
-    private static class WrappedShardResponse {
+    private static final class WrappedShardResponse {
         private final ShardIdentifier shardId;
         private final Object response;
         private final String leaderPath;
 
-        private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+        WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
             this.shardId = shardId;
             this.response = response;
             this.leaderPath = leaderPath;
@@ -1766,6 +1942,30 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return leaderPath;
         }
     }
+
+    private static final class ShardNotInitializedTimeout {
+        private final ActorRef sender;
+        private final ShardInformation shardInfo;
+        private final OnShardInitialized onShardInitialized;
+
+        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+            this.sender = sender;
+            this.shardInfo = shardInfo;
+            this.onShardInitialized = onShardInitialized;
+        }
+
+        ActorRef getSender() {
+            return sender;
+        }
+
+        ShardInformation getShardInfo() {
+            return shardInfo;
+        }
+
+        OnShardInitialized getOnShardInitialized() {
+            return onShardInitialized;
+        }
+    }
 }