Add optional timeout parameter for backup rpc
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 4bd0d677968d1e5dfb638a2ee8a661ec88c1718d..d1b9aba9d666268c6ab159f11e543ebc674fefe5 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import static akka.pattern.Patterns.ask;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
@@ -16,11 +18,16 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
+import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
 import akka.pattern.Patterns;
+import akka.persistence.DeleteSnapshotsFailure;
+import akka.persistence.DeleteSnapshotsSuccess;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
@@ -28,9 +35,7 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -38,80 +43,103 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * The ShardManager has the following jobs,
+ * Manages the shards for a data store. The ShardManager has the following jobs:
  * <ul>
  * <li> Create all the local shard replicas that belong on this cluster member
  * <li> Find the address of the local shard
  * <li> Find the primary replica for any given shard
  * <li> Monitor the cluster members and store their addresses
- * <ul>
+ * </ul>
  */
 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
     // Stores a mapping between a shard name and it's corresponding information
     // Shard names look like inventory, topology etc and are as specified in
     // configuration
-    private final Map<String, ShardInformation> localShards = new HashMap<>();
+    @VisibleForTesting
+    final Map<String, ShardInformation> localShards = new HashMap<>();
 
     // The type of a ShardManager reflects the type of the datastore itself
     // A data store could be of type config/operational
@@ -121,9 +149,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Configuration configuration;
 
-    private final String shardDispatcherPath;
+    @VisibleForTesting
+    final String shardDispatcherPath;
 
-    private final ShardManagerInfo mBean;
+    private final ShardManagerInfo shardManagerMBean;
 
     private DatastoreContextFactory datastoreContextFactory;
 
@@ -131,9 +160,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-    private final ShardPeerAddressResolver peerAddressResolver;
+    @VisibleForTesting
+    final ShardPeerAddressResolver peerAddressResolver;
 
-    private SchemaContext schemaContext;
+    private EffectiveModelContext schemaContext;
 
     private DatastoreSnapshot restoreFromSnapshot;
 
@@ -141,16 +171,23 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
+    private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
+
+    private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
+
     private final String persistenceId;
+    private final AbstractDataStore dataStore;
+
+    private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
-    ShardManager(AbstractShardManagerCreator<?> builder) {
+    ShardManager(final AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
         this.configuration = builder.getConfiguration();
         this.datastoreContextFactory = builder.getDatastoreContextFactory();
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -162,120 +199,200 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+        shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
+                "shard-manager-" + this.type,
                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
-        mBean.registerMBean();
+        shardManagerMBean.registerMBean();
+
+        dataStore = builder.getDistributedDataStore();
+    }
+
+    @Override
+    public void preStart() {
+        LOG.info("Starting ShardManager {}", persistenceId);
     }
 
     @Override
     public void postStop() {
         LOG.info("Stopping ShardManager {}", persistenceId());
 
-        mBean.unregisterMBean();
+        shardManagerMBean.unregisterMBean();
     }
 
     @Override
-    public void handleCommand(Object message) throws Exception {
+    public void handleCommand(final Object message) throws Exception {
         if (message  instanceof FindPrimary) {
             findPrimary((FindPrimary)message);
-        } else if(message instanceof FindLocalShard){
+        } else if (message instanceof FindLocalShard) {
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext(message);
-        } else if(message instanceof ActorInitialized) {
+        } else if (message instanceof ActorInitialized) {
             onActorInitialized(message);
-        } else if (message instanceof ClusterEvent.MemberUp){
+        } else if (message instanceof ClusterEvent.MemberUp) {
             memberUp((ClusterEvent.MemberUp) message);
-        } else if (message instanceof ClusterEvent.MemberExited){
+        } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
+            memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
+        } else if (message instanceof ClusterEvent.MemberExited) {
             memberExited((ClusterEvent.MemberExited) message);
-        } else if(message instanceof ClusterEvent.MemberRemoved) {
+        } else if (message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
-        } else if(message instanceof ClusterEvent.UnreachableMember) {
-            memberUnreachable((ClusterEvent.UnreachableMember)message);
-        } else if(message instanceof ClusterEvent.ReachableMember) {
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
+            memberUnreachable((ClusterEvent.UnreachableMember) message);
+        } else if (message instanceof ClusterEvent.ReachableMember) {
             memberReachable((ClusterEvent.ReachableMember) message);
-        } else if(message instanceof DatastoreContextFactory) {
-            onDatastoreContextFactory((DatastoreContextFactory)message);
-        } else if(message instanceof RoleChangeNotification) {
+        } else if (message instanceof DatastoreContextFactory) {
+            onDatastoreContextFactory((DatastoreContextFactory) message);
+        } else if (message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
-        } else if(message instanceof FollowerInitialSyncUpStatus){
+        } else if (message instanceof FollowerInitialSyncUpStatus) {
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
-        } else if(message instanceof ShardNotInitializedTimeout) {
-            onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
-        } else if(message instanceof ShardLeaderStateChanged) {
+        } else if (message instanceof ShardNotInitializedTimeout) {
+            onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
+        } else if (message instanceof ShardLeaderStateChanged) {
             onLeaderStateChanged((ShardLeaderStateChanged) message);
-        } else if(message instanceof SwitchShardBehavior){
+        } else if (message instanceof SwitchShardBehavior) {
             onSwitchShardBehavior((SwitchShardBehavior) message);
-        } else if(message instanceof CreateShard) {
+        } else if (message instanceof CreateShard) {
             onCreateShard((CreateShard)message);
-        } else if(message instanceof AddShardReplica){
-            onAddShardReplica((AddShardReplica)message);
-        } else if(message instanceof ForwardedAddServerReply) {
+        } else if (message instanceof AddShardReplica) {
+            onAddShardReplica((AddShardReplica) message);
+        } else if (message instanceof AddPrefixShardReplica) {
+            onAddPrefixShardReplica((AddPrefixShardReplica) message);
+        } else if (message instanceof PrefixShardCreated) {
+            onPrefixShardCreated((PrefixShardCreated) message);
+        } else if (message instanceof PrefixShardRemoved) {
+            onPrefixShardRemoved((PrefixShardRemoved) message);
+        } else if (message instanceof InitConfigListener) {
+            onInitConfigListener();
+        } else if (message instanceof ForwardedAddServerReply) {
             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
                     msg.removeShardOnFailure);
-        } else if(message instanceof ForwardedAddServerFailure) {
+        } else if (message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
-        } else if(message instanceof PrimaryShardFoundForContext) {
-            PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
-            onPrimaryShardFoundContext(primaryShardFoundContext);
-        } else if(message instanceof RemoveShardReplica) {
+        } else if (message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
-        } else if(message instanceof WrappedShardResponse){
+        } else if (message instanceof RemovePrefixShardReplica) {
+            onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
+        } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
-        } else if(message instanceof GetSnapshot) {
-            onGetSnapshot();
-        } else if(message instanceof ServerRemoved){
+        } else if (message instanceof GetSnapshot) {
+            onGetSnapshot((GetSnapshot) message);
+        } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
-        } else if(message instanceof SaveSnapshotSuccess) {
-            onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
-        } else if(message instanceof SaveSnapshotFailure) {
-            LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
-                    persistenceId(), ((SaveSnapshotFailure) message).cause());
-        } else if(message instanceof Shutdown) {
+        } else if (message instanceof ChangeShardMembersVotingStatus) {
+            onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+        } else if (message instanceof FlipShardMembersVotingStatus) {
+            onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
+        } else if (message instanceof SaveSnapshotSuccess) {
+            onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+        } else if (message instanceof SaveSnapshotFailure) {
+            LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
+                    ((SaveSnapshotFailure) message).cause());
+        } else if (message instanceof Shutdown) {
             onShutDown();
         } else if (message instanceof GetLocalShardIds) {
             onGetLocalShardIds();
+        } else if (message instanceof GetShardRole) {
+            onGetShardRole((GetShardRole) message);
+        } else if (message instanceof RunnableMessage) {
+            ((RunnableMessage)message).run();
+        } else if (message instanceof RegisterForShardAvailabilityChanges) {
+            onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
+        } else if (message instanceof DeleteSnapshotsFailure) {
+            LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
+                    ((DeleteSnapshotsFailure) message).cause());
+        } else if (message instanceof DeleteSnapshotsSuccess) {
+            LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
+        } else if (message instanceof RegisterRoleChangeListenerReply) {
+            LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
+        } else if (message instanceof ClusterEvent.MemberEvent) {
+            LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
         } else {
             unknownMessage(message);
         }
     }
 
-    private void onShutDown() {
+    private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
+        LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
+
+        final Consumer<String> callback = message.getCallback();
+        shardAvailabilityCallbacks.add(callback);
+
+        getSender().tell(new Status.Success((Registration)
+            () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
+    }
+
+    private void onGetShardRole(final GetShardRole message) {
+        LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
+
+        final String name = message.getName();
+
+        final ShardInformation shardInformation = localShards.get(name);
+
+        if (shardInformation == null) {
+            LOG.info("{}: no shard information for {} found", persistenceId(), name);
+            getSender().tell(new Status.Failure(
+                    new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
+            return;
+        }
+
+        getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
+    }
+
+    private void onInitConfigListener() {
+        LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
+
+        final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
+                org.opendaylight.mdsal.common.api.LogicalDatastoreType
+                        .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+        if (configUpdateHandler != null) {
+            configUpdateHandler.close();
+        }
+
+        configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+        configUpdateHandler.initListener(dataStore, datastoreType);
+    }
+
+    void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
 
-                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
+                        .getElectionTimeOutInterval().$times(2);
                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
             }
         }
 
         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
 
-        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
+                .getDispatcher(Dispatchers.DispatcherType.Client);
         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
 
         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
             @Override
-            public void onComplete(Throwable failure, Iterable<Boolean> results) {
+            public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
 
                 self().tell(PoisonPill.getInstance(), self());
 
-                if(failure != null) {
+                if (failure != null) {
                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
                 } else {
                     int nfailed = 0;
-                    for(Boolean r: results) {
-                        if(!r) {
+                    for (Boolean result : results) {
+                        if (!result) {
                             nfailed++;
                         }
                     }
 
-                    if(nfailed > 0) {
+                    if (nfailed > 0) {
                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
                     }
                 }
@@ -283,25 +400,25 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, dispatcher);
     }
 
-    private void onWrappedShardResponse(WrappedShardResponse message) {
+    private void onWrappedShardResponse(final WrappedShardResponse message) {
         if (message.getResponse() instanceof RemoveServerReply) {
             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
                     message.getLeaderPath());
         }
     }
 
-    private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
-            String leaderPath) {
-        shardReplicaOperationsInProgress.remove(shardId);
+    private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
+            final RemoveServerReply replyMsg, final String leaderPath) {
+        shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
-        LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+        LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
-            LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+            LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
                     shardId.getShardName());
             originalSender.tell(new Status.Success(null), getSelf());
         } else {
-            LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+            LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
                     persistenceId(), shardId, replyMsg.getStatus());
 
             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
@@ -309,19 +426,54 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
-        if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
-            addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
-                    getSender());
-        } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
-            removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
-                    primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
+    private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+                                          final String primaryPath, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
         }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+        final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+        //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+        LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+                primaryPath, shardId);
+
+        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+        Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+                new RemoveServer(shardId.toString()), removeServerTimeout);
+
+        futureObj.onComplete(new OnComplete<>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                if (failure != null) {
+                    shardReplicaOperationsInProgress.remove(shardName);
+
+                    LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
+                        shardName, failure);
+
+                    // FAILURE
+                    sender.tell(new Status.Failure(new RuntimeException(
+                        String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
+                        failure)), self());
+                } else {
+                    // SUCCESS
+                    self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
-            final ActorRef sender) {
-        if(isShardReplicaOperationInProgress(shardName, sender)) {
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
+    private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
+            final String primaryPath, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
 
@@ -332,25 +484,25 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
 
         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
-        LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+        LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
                 primaryPath, shardId);
 
-        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
-                duration());
+        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
-                    String msg = String.format("RemoveServer request to leader %s for shard %s failed",
-                            primaryPath, shardName);
-
-                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+                    shardReplicaOperationsInProgress.remove(shardName);
+                    LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
+                        shardName, failure);
 
                     // FAILURE
-                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                    sender.tell(new Status.Failure(new RuntimeException(
+                        String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
+                        failure)), self());
                 } else {
                     // SUCCESS
                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
@@ -359,27 +511,65 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void onShardReplicaRemoved(ServerRemoved message) {
-        final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
-        final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
-        if(shardInformation == null) {
+    private void onShardReplicaRemoved(final ServerRemoved message) {
+        removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void removeShard(final ShardIdentifier shardId) {
+        final String shardName = shardId.getShardName();
+        final ShardInformation shardInformation = localShards.remove(shardName);
+        if (shardInformation == null) {
             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
             return;
-        } else if(shardInformation.getActor() != null) {
-            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
-            shardInformation.getActor().tell(Shutdown.INSTANCE, self());
         }
-        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+
+        final ActorRef shardActor = shardInformation.getActor();
+        if (shardActor != null) {
+            long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
+                    .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+
+            LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
+                    timeoutInMS);
+
+            final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
+                    FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
+
+            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
+                @Override
+                public void onComplete(final Throwable failure, final Boolean result) {
+                    if (failure == null) {
+                        LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
+                    } else {
+                        LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
+                    }
+
+                    self().tell((RunnableMessage) () -> {
+                        // At any rate, invalidate primaryShardInfo cache
+                        primaryShardInfoCache.remove(shardName);
+
+                        shardActorsStopping.remove(shardName);
+                        notifyOnCompleteTasks(failure, result);
+                    }, ActorRef.noSender());
+                }
+            };
+
+            shardActorsStopping.put(shardName, onComplete);
+            stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
+                    .getDispatcher(Dispatchers.DispatcherType.Client));
+        }
+
+        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
         persistShardList();
     }
 
-    private void onGetSnapshot() {
+    private void onGetSnapshot(final GetSnapshot getSnapshot) {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
-        for(ShardInformation shardInfo: localShards.values()) {
-            if(!shardInfo.isShardInitialized()) {
-                if(notInitialized == null) {
+        for (ShardInformation shardInfo : localShards.values()) {
+            if (!shardInfo.isShardInitialized()) {
+                if (notInitialized == null) {
                     notInitialized = new ArrayList<>();
                 }
 
@@ -387,33 +577,29 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
 
-        if(notInitialized != null) {
+        if (notInitialized != null) {
             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
             return;
         }
 
-        byte[] shardManagerSnapshot = null;
-        if(currentSnapshot != null) {
-            shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
-        }
-
         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
-                new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+                new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
-        for(ShardInformation shardInfo: localShards.values()) {
-            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+        for (ShardInformation shardInfo: localShards.values()) {
+            shardInfo.getActor().tell(getSnapshot, replyActor);
         }
     }
 
-    private void onCreateShard(CreateShard createShard) {
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void onCreateShard(final CreateShard createShard) {
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
 
         Object reply;
         try {
             String shardName = createShard.getModuleShardConfig().getShardName();
-            if(localShards.containsKey(shardName)) {
+            if (localShards.containsKey(shardName)) {
                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
             } else {
@@ -425,19 +611,102 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             reply = new Status.Failure(e);
         }
 
-        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+        if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
             getSender().tell(reply, getSelf());
         }
     }
 
-    private void doCreateShard(CreateShard createShard) {
-        ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
-        String shardName = moduleShardConfig.getShardName();
+    private void onPrefixShardCreated(final PrefixShardCreated message) {
+        LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
+
+        final PrefixShardConfiguration config = message.getConfiguration();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
+        final String shardName = shardId.getShardName();
+
+        if (isPreviousShardActorStopInProgress(shardName, message)) {
+            return;
+        }
+
+        if (localShards.containsKey(shardName)) {
+            LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+            final PrefixShardConfiguration existing =
+                    configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
+
+            if (existing != null && existing.equals(config)) {
+                // we don't have to do nothing here
+                return;
+            }
+        }
+
+        doCreatePrefixShard(config, shardId, shardName);
+    }
+
+    private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
+        final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
+        if (stopOnComplete == null) {
+            return false;
+        }
+
+        LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
+                shardName, messageToDefer);
+        final ActorRef sender = getSender();
+        stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(final Throwable failure, final Boolean result) {
+                LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+                self().tell(messageToDefer, sender);
+            }
+        });
+
+        return true;
+    }
+
+    private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId,
+            final String shardName) {
+        configuration.addPrefixShardConfiguration(config);
+
+        final Builder builder = newShardDatastoreContextBuilder(shardName);
+        builder.logicalStoreType(config.getPrefix().getDatastoreType())
+                .storeRoot(config.getPrefix().getRootIdentifier());
+        DatastoreContext shardDatastoreContext = builder.build();
+
+        final Map<String, String> peerAddresses = getPeerAddresses(shardName);
+        final boolean isActiveMember = true;
+
+        LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+                persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
+
+        final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+                shardDatastoreContext, Shard.builder(), peerAddressResolver);
+        info.setActiveMember(isActiveMember);
+        localShards.put(info.getShardName(), info);
+
+        if (schemaContext != null) {
+            info.setSchemaContext(schemaContext);
+            info.setActor(newShardActor(info));
+        }
+    }
+
+    private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+        LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+        final DOMDataTreeIdentifier prefix = message.getPrefix();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+        configuration.removePrefixShardConfiguration(prefix);
+        removeShard(shardId);
+    }
+
+    private void doCreateShard(final CreateShard createShard) {
+        final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+        final String shardName = moduleShardConfig.getShardName();
 
         configuration.addModuleShardConfiguration(moduleShardConfig);
 
         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
-        if(shardDatastoreContext == null) {
+        if (shardDatastoreContext == null) {
             shardDatastoreContext = newShardDatastoreContext(shardName);
         } else {
             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
@@ -446,13 +715,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-        boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
-                currentSnapshot.getShardList().contains(shardName);
+        boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+                && currentSnapshot.getShardList().contains(shardName);
 
         Map<String, String> peerAddresses;
         boolean isActiveMember;
-        if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
-                contains(cluster.getCurrentMemberName())) {
+        if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
+                .contains(cluster.getCurrentMemberName())) {
             peerAddresses = getPeerAddresses(shardName);
             isActiveMember = true;
         } else {
@@ -462,8 +731,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             // subsequent AddServer request will be needed to make it an active member.
             isActiveMember = false;
             peerAddresses = Collections.emptyMap();
-            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
-                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
+                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
         }
 
         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
@@ -475,21 +744,22 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         info.setActiveMember(isActiveMember);
         localShards.put(info.getShardName(), info);
 
-        if(schemaContext != null) {
-            info.setActor(newShardActor(schemaContext, info));
+        if (schemaContext != null) {
+            info.setSchemaContext(schemaContext);
+            info.setActor(newShardActor(info));
         }
     }
 
-    private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
-        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
-                shardPeerAddressResolver(peerAddressResolver);
+    private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
+        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
+                .shardPeerAddressResolver(peerAddressResolver);
     }
 
-    private DatastoreContext newShardDatastoreContext(String shardName) {
+    private DatastoreContext newShardDatastoreContext(final String shardName) {
         return newShardDatastoreContextBuilder(shardName).build();
     }
 
-    private void checkReady(){
+    private void checkReady() {
         if (isReadyWithLeaderId()) {
             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
@@ -498,15 +768,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
+    private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
-            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+            if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
                 primaryShardInfoCache.remove(shardInformation.getShardName());
+
+                notifyShardAvailabilityCallbacks(shardInformation);
             }
 
             checkReady();
@@ -515,7 +787,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+    private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) {
+        shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
+    }
+
+    private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
@@ -523,45 +799,45 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
-        if(!shardInfo.isShardInitialized()) {
+        if (!shardInfo.isShardInitialized()) {
             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
         } else {
             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
-            message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
+            message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), getSelf());
         }
     }
 
-    private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+    private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
                 status.getName(), status.isInitialSyncDone());
 
         ShardInformation shardInformation = findShardInformation(status.getName());
 
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
 
-            mBean.setSyncStatus(isInSync());
+            shardManagerMBean.setSyncStatus(isInSync());
         }
 
     }
 
-    private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
+    private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
             checkReady();
-            mBean.setSyncStatus(isInSync());
+            shardManagerMBean.setSyncStatus(isInSync());
         }
     }
 
 
-    private ShardInformation findShardInformation(String memberId) {
-        for(ShardInformation info : localShards.values()){
-            if(info.getShardId().toString().equals(memberId)){
+    private ShardInformation findShardInformation(final String memberId) {
+        for (ShardInformation info : localShards.values()) {
+            if (info.getShardId().toString().equals(memberId)) {
                 return info;
             }
         }
@@ -572,7 +848,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private boolean isReadyWithLeaderId() {
         boolean isReady = true;
         for (ShardInformation info : localShards.values()) {
-            if(!info.isShardReadyWithLeaderId()){
+            if (!info.isShardReadyWithLeaderId()) {
                 isReady = false;
                 break;
             }
@@ -580,34 +856,38 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return isReady;
     }
 
-    private boolean isInSync(){
+    private boolean isInSync() {
         for (ShardInformation info : localShards.values()) {
-            if(!info.isInSync()){
+            if (!info.isInSync()) {
                 return false;
             }
         }
         return true;
     }
 
-    private void onActorInitialized(Object message) {
+    private void onActorInitialized(final Object message) {
         final ActorRef sender = getSender();
 
         if (sender == null) {
-            return; //why is a non-actor sending this message? Just ignore.
+            // why is a non-actor sending this message? Just ignore.
+            return;
         }
 
         String actorName = sender.path().name();
         //find shard name from actor name; actor name is stringified shardId
-        ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
 
-        if (shardId.getShardName() == null) {
+        final ShardIdentifier shardId;
+        try {
+            shardId = ShardIdentifier.fromShardIdString(actorName);
+        } catch (IllegalArgumentException e) {
+            LOG.debug("{}: ignoring actor {}", persistenceId, actorName, e);
             return;
         }
 
         markShardAsInitialized(shardId.getShardName());
     }
 
-    private void markShardAsInitialized(String shardName) {
+    private void markShardAsInitialized(final String shardName) {
         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
@@ -619,7 +899,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @Override
-    protected void handleRecover(Object message) throws Exception {
+    protected void handleRecover(final Object message) throws Exception {
         if (message instanceof RecoveryCompleted) {
             onRecoveryCompleted();
         } else if (message instanceof SnapshotOffer) {
@@ -627,67 +907,39 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void onRecoveryCompleted() {
         LOG.info("Recovery complete : {}", persistenceId());
 
-        // We no longer persist SchemaContext modules so delete all the prior messages from the akka
-        // journal on upgrade from Helium.
-        deleteMessages(lastSequenceNr());
-
-        if(currentSnapshot == null && restoreFromSnapshot != null &&
-                restoreFromSnapshot.getShardManagerSnapshot() != null) {
-            try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
-                    restoreFromSnapshot.getShardManagerSnapshot()))) {
-                ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+        if (currentSnapshot == null && restoreFromSnapshot != null
+                && restoreFromSnapshot.getShardManagerSnapshot() != null) {
+            ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
 
-                LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+            LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
 
-                applyShardManagerSnapshot(snapshot);
-            } catch(Exception e) {
-                LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
-            }
+            applyShardManagerSnapshot(snapshot);
         }
 
         createLocalShards();
     }
 
-    private void findLocalShard(FindLocalShard message) {
-        final ShardInformation shardInformation = localShards.get(message.getShardName());
-
-        if(shardInformation == null){
-            getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
-            return;
-        }
-
-        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
-            @Override
-            public Object get() {
-                return new LocalShardFound(shardInformation.getActor());
-            }
-        });
-    }
-
-    private void sendResponse(ShardInformation shardInformation, boolean doWait,
-            boolean wantShardReady, final Supplier<Object> messageSupplier) {
-        if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
-            if(doWait) {
+    private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
+            final boolean wantShardReady, final Supplier<Object> messageSupplier) {
+        if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
+            if (doWait) {
                 final ActorRef sender = getSender();
                 final ActorRef self = self();
 
-                Runnable replyRunnable = new Runnable() {
-                    @Override
-                    public void run() {
-                        sender.tell(messageSupplier.get(), self);
-                    }
-                };
+                Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
 
                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
                     new OnShardInitialized(replyRunnable);
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
-                FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
-                if(shardInformation.isShardInitialized()) {
+                FiniteDuration timeout = shardInformation.getDatastoreContext()
+                        .getShardInitializationTimeout().duration();
+                if (shardInformation.isShardInitialized()) {
                     // If the shard is already initialized then we'll wait enough time for the shard to
                     // elect a leader, ie 2 times the election timeout.
                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
@@ -695,7 +947,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 }
 
                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
-                        shardInformation.getShardName());
+                        shardInformation);
 
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         timeout, getSelf(),
@@ -711,7 +963,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             } else {
                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
                         shardInformation.getShardName());
-                getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
+                getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), getSelf());
             }
 
             return;
@@ -720,56 +972,69 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
-    private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
-        return new NoShardLeaderException(null, shardId.toString());
-    }
-
-    private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+    private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
         return new NotInitializedException(String.format(
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
 
-    private void memberRemoved(ClusterEvent.MemberRemoved message) {
-        String memberName = message.member().roles().iterator().next();
+    @VisibleForTesting
+    static MemberName memberToName(final Member member) {
+        return MemberName.forName(member.roles().iterator().next());
+    }
+
+    private void memberRemoved(final ClusterEvent.MemberRemoved message) {
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
-    private void memberExited(ClusterEvent.MemberExited message) {
-        String memberName = message.member().roles().iterator().next();
+    private void memberExited(final ClusterEvent.MemberExited message) {
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
-    private void memberUp(ClusterEvent.MemberUp message) {
-        String memberName = message.member().roles().iterator().next();
+    private void memberUp(final ClusterEvent.MemberUp message) {
+        MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        addPeerAddress(memberName, message.member().address());
+        memberUp(memberName, message.member().address());
+    }
 
+    private void memberUp(final MemberName memberName, final Address address) {
+        addPeerAddress(memberName, address);
         checkReady();
     }
 
-    private void addPeerAddress(String memberName, Address address) {
+    private void memberWeaklyUp(final MemberWeaklyUp message) {
+        MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberUp(memberName, message.member().address());
+    }
+
+    private void addPeerAddress(final MemberName memberName, final Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             String shardName = info.getShardName();
             String peerId = getShardIdentifier(memberName, shardName).toString();
             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
@@ -778,40 +1043,42 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void memberReachable(ClusterEvent.ReachableMember message) {
-        String memberName = message.member().roles().iterator().next();
-        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+    private void memberReachable(final ClusterEvent.ReachableMember message) {
+        MemberName memberName = memberToName(message.member());
+        LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
 
         markMemberAvailable(memberName);
     }
 
-    private void memberUnreachable(ClusterEvent.UnreachableMember message) {
-        String memberName = message.member().roles().iterator().next();
-        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+    private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
+        MemberName memberName = memberToName(message.member());
+        LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
     }
 
-    private void markMemberUnavailable(final String memberName) {
-        for(ShardInformation info : localShards.values()){
+    private void markMemberUnavailable(final MemberName memberName) {
+        for (ShardInformation info : localShards.values()) {
             String leaderId = info.getLeaderId();
-            if(leaderId != null && leaderId.contains(memberName)) {
+            if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
                 info.setLeaderAvailable(false);
 
                 primaryShardInfoCache.remove(info.getShardName());
+
+                notifyShardAvailabilityCallbacks(info);
             }
 
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
-    private void markMemberAvailable(final String memberName) {
-        for(ShardInformation info : localShards.values()){
+    private void markMemberAvailable(final MemberName memberName) {
+        for (ShardInformation info : localShards.values()) {
             String leaderId = info.getLeaderId();
-            if(leaderId != null && leaderId.contains(memberName)) {
+            if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
                 LOG.debug("Marking Leader {} as available.", leaderId);
                 info.setLeaderAvailable(true);
             }
@@ -820,7 +1087,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+    private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
         datastoreContextFactory = factory;
         for (ShardInformation info : localShards.values()) {
             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
@@ -862,26 +1129,39 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ActorRef actor = info.getActor();
         if (actor != null) {
             actor.tell(switchBehavior, getSelf());
-          } else {
+        } else {
             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
                 info.getShardName(), switchBehavior.getNewState());
         }
     }
 
     /**
-     * Notifies all the local shards of a change in the schema context
+     * Notifies all the local shards of a change in the schema context.
      *
-     * @param message
+     * @param message the message to send
      */
     private void updateSchemaContext(final Object message) {
-        schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+        schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext();
 
-        LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
+        LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
 
         for (ShardInformation info : localShards.values()) {
+            info.setSchemaContext(schemaContext);
+
             if (info.getActor() == null) {
                 LOG.debug("Creating Shard {}", info.getShardId());
-                info.setActor(newShardActor(schemaContext, info));
+                info.setActor(newShardActor(info));
+                // Update peer address for every existing peer memeber to avoid missing sending
+                // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
+                String shardName = info.getShardName();
+                for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
+                    String peerId = getShardIdentifier(memberName, shardName).toString() ;
+                    String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
+                    info.updatePeerAddress(peerId, peerAddress, getSelf());
+                    info.peerUp(memberName, peerId, getSelf());
+                    LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
+                            persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
+                }
             } else {
                 info.getActor().tell(message, getSelf());
             }
@@ -894,12 +1174,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
-        return getContext().actorOf(info.newProps(schemaContext)
-                .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+    protected ActorRef newShardActor(final ShardInformation info) {
+        return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
+                info.getShardId().toString());
     }
 
-    private void findPrimary(FindPrimary message) {
+    private void findPrimary(final FindPrimary message) {
         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
 
         final String shardName = message.getShardName();
@@ -908,27 +1188,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null && info.isActiveMember()) {
-            sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
-                @Override
-                public Object get() {
-                    String primaryPath = info.getSerializedLeaderActor();
-                    Object found = canReturnLocalShardState && info.isLeader() ?
-                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
-                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
-
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
-                            }
-
-                            return found;
-                }
+            sendResponse(info, message.isWaitUntilReady(), true, () -> {
+                String primaryPath = info.getSerializedLeaderActor();
+                Object found = canReturnLocalShardState && info.isLeader()
+                        ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                            new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                return found;
             });
 
             return;
         }
 
         final Collection<String> visitedAddresses;
-        if(message instanceof RemoteFindPrimary) {
+        if (message instanceof RemoteFindPrimary) {
             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
         } else {
             visitedAddresses = new ArrayList<>(1);
@@ -936,8 +1210,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
 
-        for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
-            if(visitedAddresses.contains(address)) {
+        for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+            if (visitedAddresses.contains(address)) {
                 continue;
             }
 
@@ -955,62 +1229,95 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 String.format("No primary shard found for %s.", shardName)), getSelf());
     }
 
+    private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+                .getShardInitializationTimeout().duration().$times(2));
+
+        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+        futureObj.onComplete(new OnComplete<>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                if (failure != null) {
+                    handler.onFailure(failure);
+                } else {
+                    if (response instanceof RemotePrimaryShardFound) {
+                        handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+                    } else if (response instanceof LocalPrimaryShardFound) {
+                        handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+                    } else {
+                        handler.onUnknownResponse(response);
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     /**
      * Construct the name of the shard actor given the name of the member on
-     * which the shard resides and the name of the shard
+     * which the shard resides and the name of the shard.
      *
-     * @param memberName
-     * @param shardName
-     * @return
+     * @param memberName the member name
+     * @param shardName the shard name
+     * @return a b
      */
-    private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+    private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
         return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
     /**
-     * Create shards that are local to the member on which the ShardManager
-     * runs
-     *
+     * Create shards that are local to the member on which the ShardManager runs.
      */
     private void createLocalShards() {
-        String memberName = this.cluster.getCurrentMemberName();
+        MemberName memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
-        if(restoreFromSnapshot != null)
-        {
-            for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+        if (restoreFromSnapshot != null) {
+            for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
                 shardSnapshots.put(snapshot.getName(), snapshot);
             }
         }
 
-        restoreFromSnapshot = null; // null out to GC
+        // null out to GC
+        restoreFromSnapshot = null;
 
-        for(String shardName : memberShardNames){
+        for (String shardName : memberShardNames) {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
 
             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
 
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
-                    newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
-                        shardSnapshots.get(shardName)), peerAddressResolver));
+            localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
+                    newShardDatastoreContext(shardName), shardSnapshots));
         }
     }
 
+    @VisibleForTesting
+    ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
+                                        final Map<String, String> peerAddresses,
+                                        final DatastoreContext datastoreContext,
+                                        final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+        return new ShardInformation(shardName, shardId, peerAddresses,
+                datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+                peerAddressResolver);
+    }
+
     /**
-     * Given the name of the shard find the addresses of all it's peers
+     * Given the name of the shard find the addresses of all it's peers.
      *
-     * @param shardName
+     * @param shardName the shard name
      */
-    private Map<String, String> getPeerAddresses(String shardName) {
-        Collection<String> members = configuration.getMembersFromShardName(shardName);
-        Map<String, String> peerAddresses = new HashMap<>();
+    Map<String, String> getPeerAddresses(final String shardName) {
+        final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+        return getPeerAddresses(shardName, members);
+    }
 
-        String currentMemberName = this.cluster.getCurrentMemberName();
+    private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+        Map<String, String> peerAddresses = new HashMap<>();
+        MemberName currentMemberName = this.cluster.getCurrentMemberName();
 
-        for(String memberName : members) {
-            if(!currentMemberName.equals(memberName)) {
+        for (MemberName memberName : members) {
+            if (!currentMemberName.equals(memberName)) {
                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
                 peerAddresses.put(shardId.toString(), address);
@@ -1022,16 +1329,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     @Override
     public SupervisorStrategy supervisorStrategy() {
 
-        return new OneForOneStrategy(10, Duration.create("1 minute"),
-                new Function<Throwable, SupervisorStrategy.Directive>() {
-            @Override
-            public SupervisorStrategy.Directive apply(Throwable t) {
-                LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
-                return SupervisorStrategy.resume();
-            }
-        }
-                );
-
+        return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES),
+                (Function<Throwable, Directive>) t -> {
+                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+                    return SupervisorStrategy.resume();
+                });
     }
 
     @Override
@@ -1040,65 +1342,145 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    ShardManagerInfoMBean getMBean(){
-        return mBean;
+    ShardManagerInfoMBean getMBean() {
+        return shardManagerMBean;
     }
 
     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
         if (shardReplicaOperationsInProgress.contains(shardName)) {
-            String msg = String.format("A shard replica operation for %s is already in progress", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+            LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName);
+            sender.tell(new Status.Failure(new IllegalStateException(
+                String.format("A shard replica operation for %s is already in progress", shardName))), getSelf());
             return true;
         }
 
         return false;
     }
 
-    private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
+    private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
+        LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
+
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(message.getShardPrefix()));
+        final String shardName = shardId.getShardName();
+
+        // Create the localShard
+        if (schemaContext == null) {
+            LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
+                persistenceId(), shardName);
+            getSender().tell(new Status.Failure(new IllegalStateException(
+                "No SchemaContext is available in order to create a local shard instance for " + shardName)),
+                getSelf());
+            return;
+        }
+
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+                getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+                final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
+                        message.getShardPrefix(), response, getSender());
+                if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+                    getSelf().tell(runnable, getTargetActor());
+                }
+            }
+
+            @Override
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
+                sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+            }
+        });
+    }
+
+    private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         // verify the shard with the specified name is present in the cluster configuration
-        if (!(this.configuration.isShardConfigured(shardName))) {
-            String msg = String.format("No module configuration exists for shard %s", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
+        if (!this.configuration.isShardConfigured(shardName)) {
+            LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName);
+            getSender().tell(new Status.Failure(new IllegalArgumentException(
+                "No module configuration exists for shard " + shardName)), getSelf());
             return;
         }
 
         // Create the localShard
         if (schemaContext == null) {
-            String msg = String.format(
-                  "No SchemaContext is available in order to create a local shard instance for %s", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+            LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
+                persistenceId(), shardName);
+            getSender().tell(new Status.Failure(new IllegalStateException(
+                "No SchemaContext is available in order to create a local shard instance for " + shardName)),
+                getSelf());
             return;
         }
 
-        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+                getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+                final RunnableMessage runnable = (RunnableMessage) () ->
+                    addShard(getShardName(), response, getSender());
+                if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+                    getSelf().tell(runnable, getTargetActor());
+                }
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
-
         });
     }
 
-    private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
-        String msg = String.format("Local shard %s already exists", shardName);
-        LOG.debug ("{}: {}", persistenceId(), msg);
-        sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
+    private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
+        LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
+        sender.tell(new Status.Failure(new AlreadyExistsException(
+            String.format("Local shard %s already exists", shardName))), getSelf());
+    }
+
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
+    private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
+                                final RemotePrimaryShardFound response, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardInformation shardInfo;
+        final boolean removeShardOnFailure;
+        ShardInformation existingShardInfo = localShards.get(shardName);
+        if (existingShardInfo == null) {
+            removeShardOnFailure = true;
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+            final Builder builder = newShardDatastoreContextBuilder(shardName);
+            builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+            DatastoreContext datastoreContext = builder.build();
+
+            shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+                    Shard.builder(), peerAddressResolver);
+            shardInfo.setActiveMember(false);
+            shardInfo.setSchemaContext(schemaContext);
+            localShards.put(shardName, shardInfo);
+            shardInfo.setActor(newShardActor(shardInfo));
+        } else {
+            removeShardOnFailure = false;
+            shardInfo = existingShardInfo;
+        }
+
+        execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
-        if(isShardReplicaOperationInProgress(shardName, sender)) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
 
@@ -1107,42 +1489,53 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ShardInformation shardInfo;
         final boolean removeShardOnFailure;
         ShardInformation existingShardInfo = localShards.get(shardName);
-        if(existingShardInfo == null) {
+        if (existingShardInfo == null) {
             removeShardOnFailure = true;
             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
-                    DisableElectionsRaftPolicy.class.getName()).build();
+            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
+                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
 
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
             shardInfo.setActiveMember(false);
+            shardInfo.setSchemaContext(schemaContext);
             localShards.put(shardName, shardInfo);
-            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+            shardInfo.setActor(newShardActor(shardInfo));
         } else {
             removeShardOnFailure = false;
             shardInfo = existingShardInfo;
         }
 
-        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+        execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+    }
+
+    private void execAddShard(final String shardName,
+                              final ShardInformation shardInfo,
+                              final RemotePrimaryShardFound response,
+                              final boolean removeShardOnFailure,
+                              final ActorRef sender) {
+
+        final String localShardAddress =
+                peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
 
         //inform ShardLeader to add this shard as a replica by sending an AddServer message
-        LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+        LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
                 response.getPrimaryPath(), shardInfo.getShardId());
 
-        Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
-                duration());
-        Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
-            new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+        final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+                .getShardLeaderElectionTimeout().duration());
+        final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+                new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
-            public void onComplete(Throwable failure, Object addServerResponse) {
+            public void onComplete(final Throwable failure, final Object addServerResponse) {
                 if (failure != null) {
-                    LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+                    LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
-                    String msg = String.format("AddServer request to leader %s for shard %s failed",
+                    final String msg = String.format("AddServer request to leader %s for shard %s failed",
                             response.getPrimaryPath(), shardName);
                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
                 } else {
@@ -1153,11 +1546,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
-            boolean removeShardOnFailure) {
+    private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
+            final ActorRef sender, final boolean removeShardOnFailure) {
         shardReplicaOperationsInProgress.remove(shardName);
 
-        if(removeShardOnFailure) {
+        if (removeShardOnFailure) {
             ShardInformation shardInfo = localShards.remove(shardName);
             if (shardInfo.getActor() != null) {
                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
@@ -1168,15 +1561,15 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             new RuntimeException(message, failure)), getSelf());
     }
 
-    private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
-            String leaderPath, boolean removeShardOnFailure) {
+    private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
+            final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
         String shardName = shardInfo.getShardName();
         shardReplicaOperationsInProgress.remove(shardName);
 
-        LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+        LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
-            LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+            LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
@@ -1184,56 +1577,82 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             persistShardList();
 
             sender.tell(new Status.Success(null), getSelf());
-        } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+        } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
             sendLocalReplicaAlreadyExistsReply(shardName, sender);
         } else {
-            LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+            LOG.warn("{}: Leader failed to add shard replica {} with status {}",
                     persistenceId(), shardName, replyMsg.getStatus());
 
-            Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
+            Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
+                    shardInfo.getShardId());
 
             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
         }
     }
 
-    private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
-                                               String leaderPath, ShardIdentifier shardId) {
-        Exception failure;
+    private static Exception getServerChangeException(final Class<?> serverChange,
+            final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
         switch (serverChangeStatus) {
             case TIMEOUT:
-                failure = new TimeoutException(String.format(
-                        "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
-                        "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
-                        leaderPath, shardId.getShardName()));
-                break;
+                return new TimeoutException(String.format(
+                        "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
+                        + "Possible causes - there was a problem replicating the data or shard leadership changed "
+                        + "while replicating the shard data", leaderPath, shardId.getShardName()));
             case NO_LEADER:
-                failure = createNoShardLeaderException(shardId);
-                break;
+                return new NoShardLeaderException(shardId);
             case NOT_SUPPORTED:
-                failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+                return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
                         serverChange.getSimpleName(), shardId.getShardName()));
-                break;
             default :
-                failure = new RuntimeException(String.format(
-                        "%s request to leader %s for shard %s failed with status %s",
+                return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s",
                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
         }
-        return failure;
     }
 
-    private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+    private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
-                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
+                        primaryPath, getSender()), getTargetActor());
+            }
+        });
+    }
+
+    private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
+        LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
+
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(message.getShardPrefix()));
+        final String shardName = shardId.getShardName();
+
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
+                shardName, persistenceId(), getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            @Override
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
+                        primaryPath, getSender()), getTargetActor());
             }
         });
     }
@@ -1245,27 +1664,29 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 shardList.remove(shardInfo.getShardName());
             }
         }
-        LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
-        saveSnapshot(updateShardManagerSnapshot(shardList));
+        LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
+        saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
     }
 
-    private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
-        currentSnapshot = new ShardManagerSnapshot(shardList);
+    private ShardManagerSnapshot updateShardManagerSnapshot(
+            final List<String> shardList,
+            final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+        currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
         return currentSnapshot;
     }
 
-    private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+    private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
         currentSnapshot = snapshot;
 
-        LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+        LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
 
-        String currentMember = cluster.getCurrentMemberName();
+        final MemberName currentMember = cluster.getCurrentMemberName();
         Set<String> configuredShardList =
             new HashSet<>(configuration.getMemberShardNames(currentMember));
         for (String shard : currentSnapshot.getShardList()) {
             if (!configuredShardList.contains(shard)) {
                 // add the current member as a replica for the shard
-                LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+                LOG.debug("{}: adding shard {}", persistenceId(), shard);
                 configuration.addMemberReplicaForShard(shard, currentMember);
             } else {
                 configuredShardList.remove(shard);
@@ -1273,26 +1694,181 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
         for (String shard : configuredShardList) {
             // remove the member as a replica for the shard
-            LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+            LOG.debug("{}: removing shard {}", persistenceId(), shard);
             configuration.removeMemberReplicaForShard(shard, currentMember);
         }
     }
 
-    private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
-        LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+    private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
+        LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
             0, 0));
     }
 
+    private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+        LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
+
+        String shardName = changeMembersVotingStatus.getShardName();
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+            serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+                    e.getValue());
+        }
+
+        ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
+
+        findLocalShard(shardName, getSender(),
+            localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+            localShardFound.getPath(), getSender()));
+    }
+
+    private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
+        LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
+
+        ActorRef sender = getSender();
+        final String shardName = flipMembersVotingStatus.getShardName();
+        findLocalShard(shardName, sender, localShardFound -> {
+            Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+                    Timeout.apply(30, TimeUnit.SECONDS));
+
+            future.onComplete(new OnComplete<>() {
+                @Override
+                public void onComplete(final Throwable failure, final Object response) {
+                    if (failure != null) {
+                        sender.tell(new Status.Failure(new RuntimeException(
+                                String.format("Failed to access local shard %s", shardName), failure)), self());
+                        return;
+                    }
+
+                    OnDemandRaftState raftState = (OnDemandRaftState) response;
+                    Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+                    for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                        serverVotingStatusMap.put(e.getKey(), !e.getValue());
+                    }
+
+                    serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
+                            .toString(), !raftState.isVoting());
+
+                    changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+                            shardName, localShardFound.getPath(), sender);
+                }
+            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        });
+
+    }
+
+    private void findLocalShard(final FindLocalShard message) {
+        LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
+
+        final ShardInformation shardInformation = localShards.get(message.getShardName());
+
+        if (shardInformation == null) {
+            LOG.debug("{}: Local shard {} not found - shards present: {}",
+                    persistenceId(), message.getShardName(), localShards.keySet());
+
+            getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+            return;
+        }
+
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
+            () -> new LocalShardFound(shardInformation.getActor()));
+    }
+
+    private void findLocalShard(final String shardName, final ActorRef sender,
+            final Consumer<LocalShardFound> onLocalShardFound) {
+        Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+                .getShardInitializationTimeout().duration().$times(2));
+
+        Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+        futureObj.onComplete(new OnComplete<>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                if (failure != null) {
+                    LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
+                            failure);
+                    sender.tell(new Status.Failure(new RuntimeException(
+                            String.format("Failed to find local shard %s", shardName), failure)), self());
+                } else {
+                    if (response instanceof LocalShardFound) {
+                        getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
+                                sender);
+                    } else if (response instanceof LocalShardNotFound) {
+                        LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName);
+                        sender.tell(new Status.Failure(new IllegalArgumentException(
+                            String.format("Local shard %s does not exist", shardName))), self());
+                    } else {
+                        LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName,
+                            response);
+                        sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
+                                : new RuntimeException(
+                                    String.format("Failed to find local shard %s: received response: %s", shardName,
+                                        response))), self());
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
+            final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+        LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+                changeServersVotingStatus, shardActorRef.path());
+
+        Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+        Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
+
+        futureObj.onComplete(new OnComplete<>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                shardReplicaOperationsInProgress.remove(shardName);
+                if (failure != null) {
+                    LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(),
+                        shardActorRef.path(), failure);
+                    sender.tell(new Status.Failure(new RuntimeException(
+                        String.format("ChangeServersVotingStatus request to local shard %s failed",
+                            shardActorRef.path()), failure)), self());
+                } else {
+                    LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+                    ServerChangeReply replyMsg = (ServerChangeReply) response;
+                    if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+                        LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+                        sender.tell(new Status.Success(null), getSelf());
+                    } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+                        sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+                                "The requested voting state change for shard %s is invalid. At least one member "
+                                + "must be voting", shardId.getShardName()))), getSelf());
+                    } else {
+                        LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+                                persistenceId(), shardName, replyMsg.getStatus());
+
+                        Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+                                replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+                        sender.tell(new Status.Failure(error), getSelf());
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     private static final class ForwardedAddServerReply {
         ShardInformation shardInfo;
         AddServerReply addServerReply;
         String leaderPath;
         boolean removeShardOnFailure;
 
-        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
-                boolean removeShardOnFailure) {
+        ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
+            final String leaderPath, final boolean removeShardOnFailure) {
             this.shardInfo = shardInfo;
             this.addServerReply = addServerReply;
             this.leaderPath = leaderPath;
@@ -1306,8 +1882,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Throwable failure;
         boolean removeShardOnFailure;
 
-        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
-                boolean removeShardOnFailure) {
+        ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
+                final boolean removeShardOnFailure) {
             this.shardName = shardName;
             this.failureMessage = failureMessage;
             this.failure = failure;
@@ -1319,7 +1895,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Runnable replyRunnable;
         private Cancellable timeoutSchedule;
 
-        OnShardInitialized(Runnable replyRunnable) {
+        OnShardInitialized(final Runnable replyRunnable) {
             this.replyRunnable = replyRunnable;
         }
 
@@ -1331,95 +1907,78 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return timeoutSchedule;
         }
 
-        void setTimeoutSchedule(Cancellable timeoutSchedule) {
+        void setTimeoutSchedule(final Cancellable timeoutSchedule) {
             this.timeoutSchedule = timeoutSchedule;
         }
     }
 
     static class OnShardReady extends OnShardInitialized {
-        OnShardReady(Runnable replyRunnable) {
+        OnShardReady(final Runnable replyRunnable) {
             super(replyRunnable);
         }
     }
 
-    private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
-        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
-                getShardInitializationTimeout().duration().$times(2));
-
-
-        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(Throwable failure, Object response) {
-                if (failure != null) {
-                    handler.onFailure(failure);
-                } else {
-                    if(response instanceof RemotePrimaryShardFound) {
-                        handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
-                    } else if(response instanceof LocalPrimaryShardFound) {
-                        handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
-                    } else {
-                        handler.onUnknownResponse(response);
-                    }
-                }
-            }
-        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    private interface RunnableMessage extends Runnable {
     }
 
     /**
      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
-     * a remote or local find primary message is processed
+     * a remote or local find primary message is processed.
      */
-    private static interface FindPrimaryResponseHandler {
+    private interface FindPrimaryResponseHandler {
         /**
-         * Invoked when a Failure message is received as a response
+         * Invoked when a Failure message is received as a response.
          *
-         * @param failure
+         * @param failure the failure exception
          */
         void onFailure(Throwable failure);
 
         /**
-         * Invoked when a RemotePrimaryShardFound response is received
+         * Invoked when a RemotePrimaryShardFound response is received.
          *
-         * @param response
+         * @param response the response
          */
         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
 
         /**
-         * Invoked when a LocalPrimaryShardFound response is received
-         * @param response
+         * Invoked when a LocalPrimaryShardFound response is received.
+         *
+         * @param response the response
          */
         void onLocalPrimaryFound(LocalPrimaryShardFound response);
 
         /**
          * Invoked when an unknown response is received. This is another type of failure.
          *
-         * @param response
+         * @param response the response
          */
         void onUnknownResponse(Object response);
     }
 
     /**
      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
-     * replica and sends a wrapped Failure response to some targetActor
+     * replica and sends a wrapped Failure response to some targetActor.
      */
-    private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+    private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
         private final ActorRef targetActor;
         private final String shardName;
         private final String persistenceId;
         private final ActorRef shardManagerActor;
 
         /**
+         * Constructs an instance.
+         *
          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
          * @param shardName The name of the shard for which the primary replica had to be found
          * @param persistenceId The persistenceId for the ShardManager
          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
          */
-        protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
-            this.targetActor = Preconditions.checkNotNull(targetActor);
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.persistenceId = Preconditions.checkNotNull(persistenceId);
-            this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+        protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
+                final String persistenceId, final ActorRef shardManagerActor) {
+            this.targetActor = requireNonNull(targetActor);
+            this.shardName = requireNonNull(shardName);
+            this.persistenceId = requireNonNull(persistenceId);
+            this.shardManagerActor = requireNonNull(shardManagerActor);
         }
 
         public ActorRef getTargetActor() {
@@ -1431,65 +1990,19 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         @Override
-        public void onFailure(Throwable failure) {
-            LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+        public void onFailure(final Throwable failure) {
+            LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
             targetActor.tell(new Status.Failure(new RuntimeException(
                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
         }
 
         @Override
-        public void onUnknownResponse(Object response) {
-            String msg = String.format("Failed to find leader for shard %s: received response: %s",
-                    shardName, response);
-            LOG.debug ("{}: {}", persistenceId, msg);
-            targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
-                    new RuntimeException(msg)), shardManagerActor);
-        }
-    }
-
-    /**
-     * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
-     * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
-     * as a successful response to find primary.
-     */
-    private static class PrimaryShardFoundForContext {
-        private final String shardName;
-        private final Object contextMessage;
-        private final RemotePrimaryShardFound remotePrimaryShardFound;
-        private final LocalPrimaryShardFound localPrimaryShardFound;
-
-        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
-                @Nonnull Object primaryFoundMessage) {
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.contextMessage = Preconditions.checkNotNull(contextMessage);
-            Preconditions.checkNotNull(primaryFoundMessage);
-            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
-                    (RemotePrimaryShardFound) primaryFoundMessage : null;
-            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
-                    (LocalPrimaryShardFound) primaryFoundMessage : null;
-        }
-
-        @Nonnull
-        String getPrimaryPath(){
-            if(remotePrimaryShardFound != null) {
-                return remotePrimaryShardFound.getPrimaryPath();
-            }
-            return localPrimaryShardFound.getPrimaryPath();
-        }
-
-        @Nonnull
-        Object getContextMessage() {
-            return contextMessage;
-        }
-
-        @Nullable
-        RemotePrimaryShardFound getRemotePrimaryShardFound() {
-            return remotePrimaryShardFound;
-        }
-
-        @Nonnull
-        String getShardName() {
-            return shardName;
+        public void onUnknownResponse(final Object response) {
+            LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName,
+                response);
+            targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
+                    : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s",
+                        shardName, response))), shardManagerActor);
         }
     }
 
@@ -1501,7 +2014,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Object response;
         private final String leaderPath;
 
-        WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+        WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
             this.shardId = shardId;
             this.response = response;
             this.leaderPath = leaderPath;
@@ -1525,7 +2038,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final ShardInformation shardInfo;
         private final OnShardInitialized onShardInitialized;
 
-        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+        ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
+            final ActorRef sender) {
             this.sender = sender;
             this.shardInfo = shardInfo;
             this.onShardInitialized = onShardInitialized;