Bug 8380: Fix unhandled messages in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 8386e669b83f46cb102771f7c12f1b9458f5e485..d3d8ce39c9a108f0b497b942f9e6e5368f3c6a21 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import static akka.pattern.Patterns.ask;
+
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
@@ -24,6 +25,8 @@ import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
 import akka.pattern.Patterns;
+import akka.persistence.DeleteSnapshotsFailure;
+import akka.persistence.DeleteSnapshotsSuccess;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
@@ -32,8 +35,6 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -48,25 +49,27 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
@@ -75,12 +78,17 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
@@ -97,6 +105,15 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,16 +123,15 @@ import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * The ShardManager has the following jobs,
+ * Manages the shards for a data store. The ShardManager has the following jobs:
  * <ul>
  * <li> Create all the local shard replicas that belong on this cluster member
  * <li> Find the address of the local shard
  * <li> Find the primary replica for any given shard
  * <li> Monitor the cluster members and store their addresses
- * <ul>
+ * </ul>
  */
 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
     // Stores a mapping between a shard name and it's corresponding information
@@ -133,7 +149,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private final ShardManagerInfo mBean;
+    private final ShardManagerInfo shardManagerMBean;
 
     private DatastoreContextFactory datastoreContextFactory;
 
@@ -151,7 +167,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
+    private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
+
     private final String persistenceId;
+    private final AbstractDataStore dataStore;
+
+    private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
+    private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
     ShardManager(AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
@@ -160,7 +182,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -172,105 +194,154 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+        shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
+                "shard-manager-" + this.type,
                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
-        mBean.registerMBean();
+        shardManagerMBean.registerMBean();
+
+        dataStore = builder.getDistributedDataStore();
+    }
+
+    @Override
+    public void preStart() {
+        LOG.info("Starting ShardManager {}", persistenceId);
     }
 
     @Override
     public void postStop() {
         LOG.info("Stopping ShardManager {}", persistenceId());
 
-        mBean.unregisterMBean();
+        shardManagerMBean.unregisterMBean();
+
+        if (configListenerReg != null) {
+            configListenerReg.close();
+            configListenerReg = null;
+        }
     }
 
     @Override
     public void handleCommand(Object message) throws Exception {
         if (message  instanceof FindPrimary) {
             findPrimary((FindPrimary)message);
-        } else if(message instanceof FindLocalShard){
+        } else if (message instanceof FindLocalShard) {
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext(message);
-        } else if(message instanceof ActorInitialized) {
+        } else if (message instanceof ActorInitialized) {
             onActorInitialized(message);
-        } else if (message instanceof ClusterEvent.MemberUp){
+        } else if (message instanceof ClusterEvent.MemberUp) {
             memberUp((ClusterEvent.MemberUp) message);
-        } else if (message instanceof ClusterEvent.MemberWeaklyUp){
+        } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
-        } else if (message instanceof ClusterEvent.MemberExited){
+        } else if (message instanceof ClusterEvent.MemberExited) {
             memberExited((ClusterEvent.MemberExited) message);
-        } else if(message instanceof ClusterEvent.MemberRemoved) {
+        } else if (message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
-        } else if(message instanceof ClusterEvent.UnreachableMember) {
-            memberUnreachable((ClusterEvent.UnreachableMember)message);
-        } else if(message instanceof ClusterEvent.ReachableMember) {
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
+            memberUnreachable((ClusterEvent.UnreachableMember) message);
+        } else if (message instanceof ClusterEvent.ReachableMember) {
             memberReachable((ClusterEvent.ReachableMember) message);
-        } else if(message instanceof DatastoreContextFactory) {
-            onDatastoreContextFactory((DatastoreContextFactory)message);
-        } else if(message instanceof RoleChangeNotification) {
+        } else if (message instanceof DatastoreContextFactory) {
+            onDatastoreContextFactory((DatastoreContextFactory) message);
+        } else if (message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
-        } else if(message instanceof FollowerInitialSyncUpStatus){
+        } else if (message instanceof FollowerInitialSyncUpStatus) {
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
-        } else if(message instanceof ShardNotInitializedTimeout) {
-            onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
-        } else if(message instanceof ShardLeaderStateChanged) {
+        } else if (message instanceof ShardNotInitializedTimeout) {
+            onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
+        } else if (message instanceof ShardLeaderStateChanged) {
             onLeaderStateChanged((ShardLeaderStateChanged) message);
-        } else if(message instanceof SwitchShardBehavior){
+        } else if (message instanceof SwitchShardBehavior) {
             onSwitchShardBehavior((SwitchShardBehavior) message);
-        } else if(message instanceof CreateShard) {
+        } else if (message instanceof CreateShard) {
             onCreateShard((CreateShard)message);
-        } else if(message instanceof AddShardReplica){
-            onAddShardReplica((AddShardReplica)message);
-        } else if(message instanceof ForwardedAddServerReply) {
+        } else if (message instanceof AddShardReplica) {
+            onAddShardReplica((AddShardReplica) message);
+        } else if (message instanceof AddPrefixShardReplica) {
+            onAddPrefixShardReplica((AddPrefixShardReplica) message);
+        } else if (message instanceof PrefixShardCreated) {
+            onPrefixShardCreated((PrefixShardCreated) message);
+        } else if (message instanceof PrefixShardRemoved) {
+            onPrefixShardRemoved((PrefixShardRemoved) message);
+        } else if (message instanceof InitConfigListener) {
+            onInitConfigListener();
+        } else if (message instanceof ForwardedAddServerReply) {
             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
                     msg.removeShardOnFailure);
-        } else if(message instanceof ForwardedAddServerFailure) {
+        } else if (message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
-        } else if(message instanceof RemoveShardReplica) {
+        } else if (message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
-        } else if(message instanceof WrappedShardResponse){
+        } else if (message instanceof RemovePrefixShardReplica) {
+            onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
+        } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
-        } else if(message instanceof GetSnapshot) {
+        } else if (message instanceof GetSnapshot) {
             onGetSnapshot();
-        } else if(message instanceof ServerRemoved){
+        } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
-        } else if(message instanceof ChangeShardMembersVotingStatus){
+        } else if (message instanceof ChangeShardMembersVotingStatus) {
             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
-        } else if(message instanceof FlipShardMembersVotingStatus){
+        } else if (message instanceof FlipShardMembersVotingStatus) {
             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
-        } else if(message instanceof SaveSnapshotSuccess) {
-            onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
-        } else if(message instanceof SaveSnapshotFailure) {
-            LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
-                    persistenceId(), ((SaveSnapshotFailure) message).cause());
-        } else if(message instanceof Shutdown) {
+        } else if (message instanceof SaveSnapshotSuccess) {
+            onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+        } else if (message instanceof SaveSnapshotFailure) {
+            LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
+                    ((SaveSnapshotFailure) message).cause());
+        } else if (message instanceof Shutdown) {
             onShutDown();
         } else if (message instanceof GetLocalShardIds) {
             onGetLocalShardIds();
-        } else if(message instanceof RunnableMessage) {
+        } else if (message instanceof RunnableMessage) {
             ((RunnableMessage)message).run();
+        } else if (message instanceof DeleteSnapshotsFailure) {
+            LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
+                    ((DeleteSnapshotsFailure) message).cause());
+        } else if (message instanceof DeleteSnapshotsSuccess) {
+            LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message);
+        } else if (message instanceof RegisterRoleChangeListenerReply) {
+            LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
+        } else if (message instanceof ClusterEvent.MemberEvent) {
+            LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
         } else {
             unknownMessage(message);
         }
     }
 
+    private void onInitConfigListener() {
+        LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
+
+        final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+                org.opendaylight.mdsal.common.api.LogicalDatastoreType
+                        .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+        if (configUpdateHandler != null) {
+            configUpdateHandler.close();
+        }
+
+        configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+        configUpdateHandler.initListener(dataStore, type);
+    }
+
     private void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
 
-                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
+                        .getElectionTimeOutInterval().$times(2);
                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
             }
         }
 
         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
 
-        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
+                .getDispatcher(Dispatchers.DispatcherType.Client);
         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
 
         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
@@ -280,17 +351,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 self().tell(PoisonPill.getInstance(), self());
 
-                if(failure != null) {
+                if (failure != null) {
                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
                 } else {
                     int nfailed = 0;
-                    for(Boolean r: results) {
-                        if(!r) {
+                    for (Boolean result : results) {
+                        if (!result) {
                             nfailed++;
                         }
                     }
 
-                    if(nfailed > 0) {
+                    if (nfailed > 0) {
                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
                     }
                 }
@@ -309,14 +380,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             String leaderPath) {
         shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
-        LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+        LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
-            LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+            LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
                     shardId.getShardName());
             originalSender.tell(new Status.Success(null), getSelf());
         } else {
-            LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+            LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
                     persistenceId(), shardId, replyMsg.getStatus());
 
             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
@@ -324,9 +395,49 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+                                          final String primaryPath, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+        final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+        //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+        LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+                primaryPath, shardId);
+
+        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+        Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+                new RemoveServer(shardId.toString()), removeServerTimeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    shardReplicaOperationsInProgress.remove(shardName);
+                    String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+                            primaryPath, shardName);
+
+                    LOG.debug("{}: {}", persistenceId(), msg, failure);
+
+                    // FAILURE
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    // SUCCESS
+                    self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
             final ActorRef sender) {
-        if(isShardReplicaOperationInProgress(shardName, sender)) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
 
@@ -337,11 +448,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
 
         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
-        LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+        LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
                 primaryPath, shardId);
 
-        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
-                duration());
+        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
@@ -353,7 +463,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                             primaryPath, shardName);
 
-                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+                    LOG.debug("{}: {}", persistenceId(), msg, failure);
 
                     // FAILURE
                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
@@ -366,16 +476,41 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onShardReplicaRemoved(ServerRemoved message) {
-        final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
-        final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
-        if(shardInformation == null) {
+        removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void removeShard(final ShardIdentifier shardId) {
+        final String shardName = shardId.getShardName();
+        final ShardInformation shardInformation = localShards.remove(shardName);
+        if (shardInformation == null) {
             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
             return;
-        } else if(shardInformation.getActor() != null) {
-            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
-            shardInformation.getActor().tell(Shutdown.INSTANCE, self());
         }
-        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+
+        final ActorRef shardActor = shardInformation.getActor();
+        if (shardActor != null) {
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
+            FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
+                    .getElectionTimeOutInterval().$times(3);
+            final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
+            shardActorStoppingFutures.put(shardName, stopFuture);
+            stopFuture.onComplete(new OnComplete<Boolean>() {
+                @Override
+                public void onComplete(Throwable failure, Boolean result) {
+                    if (failure == null) {
+                        LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
+                    } else {
+                        LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
+                    }
+
+                    self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
+                            ActorRef.noSender());
+                }
+            }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+        }
+
+        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
         persistShardList();
     }
 
@@ -383,9 +518,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
-        for(ShardInformation shardInfo: localShards.values()) {
-            if(!shardInfo.isShardInitialized()) {
-                if(notInitialized == null) {
+        for (ShardInformation shardInfo : localShards.values()) {
+            if (!shardInfo.isShardInitialized()) {
+                if (notInitialized == null) {
                     notInitialized = new ArrayList<>();
                 }
 
@@ -393,33 +528,29 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
 
-        if(notInitialized != null) {
+        if (notInitialized != null) {
             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
             return;
         }
 
-        byte[] shardManagerSnapshot = null;
-        if(currentSnapshot != null) {
-            shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
-        }
-
         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
-                new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+                new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
-        for(ShardInformation shardInfo: localShards.values()) {
+        for (ShardInformation shardInfo: localShards.values()) {
             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void onCreateShard(CreateShard createShard) {
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
 
         Object reply;
         try {
             String shardName = createShard.getModuleShardConfig().getShardName();
-            if(localShards.containsKey(shardName)) {
+            if (localShards.containsKey(shardName)) {
                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
             } else {
@@ -431,19 +562,100 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             reply = new Status.Failure(e);
         }
 
-        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+        if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
             getSender().tell(reply, getSelf());
         }
     }
 
-    private void doCreateShard(CreateShard createShard) {
-        ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
-        String shardName = moduleShardConfig.getShardName();
+    private void onPrefixShardCreated(final PrefixShardCreated message) {
+        LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
+
+        final PrefixShardConfiguration config = message.getConfiguration();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
+        final String shardName = shardId.getShardName();
+
+        if (isPreviousShardActorStopInProgress(shardName, message)) {
+            return;
+        }
+
+        if (localShards.containsKey(shardName)) {
+            LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+            final PrefixShardConfiguration existing =
+                    configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
+
+            if (existing != null && existing.equals(config)) {
+                // we don't have to do nothing here
+                return;
+            }
+        }
+
+        doCreatePrefixShard(config, shardId, shardName);
+    }
+
+    private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
+        final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
+        if (stopFuture == null) {
+            return false;
+        }
+
+        LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
+                shardName, messageToDefer);
+        final ActorRef sender = getSender();
+        stopFuture.onComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(Throwable failure, Boolean result) {
+                LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+                self().tell(messageToDefer, sender);
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+
+        return true;
+    }
+
+    private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
+        configuration.addPrefixShardConfiguration(config);
+
+        final Builder builder = newShardDatastoreContextBuilder(shardName);
+        builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+                .storeRoot(config.getPrefix().getRootIdentifier());
+        DatastoreContext shardDatastoreContext = builder.build();
+
+        final Map<String, String> peerAddresses = getPeerAddresses(shardName);
+        final boolean isActiveMember = true;
+
+        LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+                persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
+
+        final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+                shardDatastoreContext, Shard.builder(), peerAddressResolver);
+        info.setActiveMember(isActiveMember);
+        localShards.put(info.getShardName(), info);
+
+        if (schemaContext != null) {
+            info.setActor(newShardActor(schemaContext, info));
+        }
+    }
+
+    private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+        LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+        final DOMDataTreeIdentifier prefix = message.getPrefix();
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+        configuration.removePrefixShardConfiguration(prefix);
+        removeShard(shardId);
+    }
+
+    private void doCreateShard(final CreateShard createShard) {
+        final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+        final String shardName = moduleShardConfig.getShardName();
 
         configuration.addModuleShardConfiguration(moduleShardConfig);
 
         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
-        if(shardDatastoreContext == null) {
+        if (shardDatastoreContext == null) {
             shardDatastoreContext = newShardDatastoreContext(shardName);
         } else {
             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
@@ -452,13 +664,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-        boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
-                currentSnapshot.getShardList().contains(shardName);
+        boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+                && currentSnapshot.getShardList().contains(shardName);
 
         Map<String, String> peerAddresses;
         boolean isActiveMember;
-        if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
-                contains(cluster.getCurrentMemberName())) {
+        if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
+                .contains(cluster.getCurrentMemberName())) {
             peerAddresses = getPeerAddresses(shardName);
             isActiveMember = true;
         } else {
@@ -468,8 +680,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             // subsequent AddServer request will be needed to make it an active member.
             isActiveMember = false;
             peerAddresses = Collections.emptyMap();
-            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
-                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
+                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
         }
 
         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
@@ -481,21 +693,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         info.setActiveMember(isActiveMember);
         localShards.put(info.getShardName(), info);
 
-        if(schemaContext != null) {
+        if (schemaContext != null) {
             info.setActor(newShardActor(schemaContext, info));
         }
     }
 
     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
-        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
-                shardPeerAddressResolver(peerAddressResolver);
+        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
+                .shardPeerAddressResolver(peerAddressResolver);
     }
 
     private DatastoreContext newShardDatastoreContext(String shardName) {
         return newShardDatastoreContextBuilder(shardName).build();
     }
 
-    private void checkReady(){
+    private void checkReady() {
         if (isReadyWithLeaderId()) {
             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
@@ -508,10 +720,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
-            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+            if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
                 primaryShardInfoCache.remove(shardInformation.getShardName());
             }
 
@@ -529,7 +741,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
-        if(!shardInfo.isShardInitialized()) {
+        if (!shardInfo.isShardInitialized()) {
             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
         } else {
@@ -544,10 +756,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardInformation shardInformation = findShardInformation(status.getName());
 
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
 
-            mBean.setSyncStatus(isInSync());
+            shardManagerMBean.setSyncStatus(isInSync());
         }
 
     }
@@ -557,17 +769,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
-        if(shardInformation != null) {
+        if (shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
             checkReady();
-            mBean.setSyncStatus(isInSync());
+            shardManagerMBean.setSyncStatus(isInSync());
         }
     }
 
 
     private ShardInformation findShardInformation(String memberId) {
-        for(ShardInformation info : localShards.values()){
-            if(info.getShardId().toString().equals(memberId)){
+        for (ShardInformation info : localShards.values()) {
+            if (info.getShardId().toString().equals(memberId)) {
                 return info;
             }
         }
@@ -578,7 +790,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private boolean isReadyWithLeaderId() {
         boolean isReady = true;
         for (ShardInformation info : localShards.values()) {
-            if(!info.isShardReadyWithLeaderId()){
+            if (!info.isShardReadyWithLeaderId()) {
                 isReady = false;
                 break;
             }
@@ -586,9 +798,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return isReady;
     }
 
-    private boolean isInSync(){
+    private boolean isInSync() {
         for (ShardInformation info : localShards.values()) {
-            if(!info.isInSync()){
+            if (!info.isInSync()) {
                 return false;
             }
         }
@@ -636,45 +848,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void onRecoveryCompleted() {
         LOG.info("Recovery complete : {}", persistenceId());
 
-        // We no longer persist SchemaContext modules so delete all the prior messages from the akka
-        // journal on upgrade from Helium.
-        deleteMessages(lastSequenceNr());
-
-        if(currentSnapshot == null && restoreFromSnapshot != null &&
-                restoreFromSnapshot.getShardManagerSnapshot() != null) {
-            try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
-                    restoreFromSnapshot.getShardManagerSnapshot()))) {
-                ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+        if (currentSnapshot == null && restoreFromSnapshot != null
+                && restoreFromSnapshot.getShardManagerSnapshot() != null) {
+            ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
 
-                LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+            LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
 
-                applyShardManagerSnapshot(snapshot);
-            } catch(Exception e) {
-                LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
-            }
+            applyShardManagerSnapshot(snapshot);
         }
 
         createLocalShards();
     }
 
-    private void findLocalShard(FindLocalShard message) {
-        final ShardInformation shardInformation = localShards.get(message.getShardName());
-
-        if(shardInformation == null){
-            getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
-            return;
-        }
-
-        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
-    }
-
     private void sendResponse(ShardInformation shardInformation, boolean doWait,
             boolean wantShardReady, final Supplier<Object> messageSupplier) {
-        if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
-            if(doWait) {
+        if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
+            if (doWait) {
                 final ActorRef sender = getSender();
                 final ActorRef self = self();
 
@@ -685,8 +878,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
-                FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
-                if(shardInformation.isShardInitialized()) {
+                FiniteDuration timeout = shardInformation.getDatastoreContext()
+                        .getShardInitializationTimeout().duration();
+                if (shardInformation.isShardInitialized()) {
                     // If the shard is already initialized then we'll wait enough time for the shard to
                     // elect a leader, ie 2 times the election timeout.
                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
@@ -741,7 +935,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         peerAddressResolver.removePeerAddress(memberName);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
@@ -754,7 +948,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         peerAddressResolver.removePeerAddress(memberName);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
@@ -785,7 +979,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void addPeerAddress(MemberName memberName, Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
-        for(ShardInformation info : localShards.values()){
+        for (ShardInformation info : localShards.values()) {
             String shardName = info.getShardName();
             String peerId = getShardIdentifier(memberName, shardName).toString();
             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
@@ -882,16 +1076,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ActorRef actor = info.getActor();
         if (actor != null) {
             actor.tell(switchBehavior, getSelf());
-          } else {
+        } else {
             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
                 info.getShardName(), switchBehavior.getNewState());
         }
     }
 
     /**
-     * Notifies all the local shards of a change in the schema context
+     * Notifies all the local shards of a change in the schema context.
      *
-     * @param message
+     * @param message the message to send
      */
     private void updateSchemaContext(final Object message) {
         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
@@ -914,9 +1108,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
-        return getContext().actorOf(info.newProps(schemaContext)
-                .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+    protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
+        return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
+                info.getShardId().toString());
     }
 
     private void findPrimary(FindPrimary message) {
@@ -930,22 +1124,19 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (info != null && info.isActiveMember()) {
             sendResponse(info, message.isWaitUntilReady(), true, () -> {
                 String primaryPath = info.getSerializedLeaderActor();
-                Object found = canReturnLocalShardState && info.isLeader() ?
-                        new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                Object found = canReturnLocalShardState && info.isLeader()
+                        new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
 
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
-                        }
-
-                        return found;
+                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                return found;
             });
 
             return;
         }
 
         final Collection<String> visitedAddresses;
-        if(message instanceof RemoteFindPrimary) {
+        if (message instanceof RemoteFindPrimary) {
             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
         } else {
             visitedAddresses = new ArrayList<>(1);
@@ -953,8 +1144,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
 
-        for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
-            if(visitedAddresses.contains(address)) {
+        for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+            if (visitedAddresses.contains(address)) {
                 continue;
             }
 
@@ -972,38 +1163,58 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 String.format("No primary shard found for %s.", shardName)), getSelf());
     }
 
+    private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+                .getShardInitializationTimeout().duration().$times(2));
+
+        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    handler.onFailure(failure);
+                } else {
+                    if (response instanceof RemotePrimaryShardFound) {
+                        handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+                    } else if (response instanceof LocalPrimaryShardFound) {
+                        handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+                    } else {
+                        handler.onUnknownResponse(response);
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
     /**
      * Construct the name of the shard actor given the name of the member on
-     * which the shard resides and the name of the shard
+     * which the shard resides and the name of the shard.
      *
-     * @param memberName
-     * @param shardName
-     * @return
+     * @param memberName the member name
+     * @param shardName the shard name
+     * @return a b
      */
-    private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
+    private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
         return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
     /**
-     * Create shards that are local to the member on which the ShardManager
-     * runs
-     *
+     * Create shards that are local to the member on which the ShardManager runs.
      */
     private void createLocalShards() {
         MemberName memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
-        if(restoreFromSnapshot != null)
-        {
-            for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+        if (restoreFromSnapshot != null) {
+            for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
                 shardSnapshots.put(snapshot.getName(), snapshot);
             }
         }
 
         restoreFromSnapshot = null; // null out to GC
 
-        for(String shardName : memberShardNames){
+        for (String shardName : memberShardNames) {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
 
             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
@@ -1016,14 +1227,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     /**
-     * Given the name of the shard find the addresses of all it's peers
+     * Given the name of the shard find the addresses of all it's peers.
      *
-     * @param shardName
+     * @param shardName the shard name
      */
     private Map<String, String> getPeerAddresses(String shardName) {
-        Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
-        Map<String, String> peerAddresses = new HashMap<>();
+        final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+        return getPeerAddresses(shardName, members);
+    }
 
+    private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+        Map<String, String> peerAddresses = new HashMap<>();
         MemberName currentMemberName = this.cluster.getCurrentMemberName();
 
         for (MemberName memberName : members) {
@@ -1043,9 +1257,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 (Function<Throwable, Directive>) t -> {
                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
                     return SupervisorStrategy.resume();
-                }
-                );
-
+                });
     }
 
     @Override
@@ -1054,14 +1266,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    ShardManagerInfoMBean getMBean(){
-        return mBean;
+    ShardManagerInfoMBean getMBean() {
+        return shardManagerMBean;
     }
 
     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
         if (shardReplicaOperationsInProgress.contains(shardName)) {
             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
+            LOG.debug("{}: {}", persistenceId(), msg);
             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
             return true;
         }
@@ -1069,15 +1281,49 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return false;
     }
 
-    private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
+    private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
+        LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
+
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(message.getShardPrefix()));
+        final String shardName = shardId.getShardName();
+
+        // Create the localShard
+        if (schemaContext == null) {
+            String msg = String.format(
+                    "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+            LOG.debug("{}: {}", persistenceId(), msg);
+            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+            return;
+        }
+
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+                getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+                final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
+                        message.getShardPrefix(), response, getSender());
+                if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+                    getSelf().tell(runnable, getTargetActor());
+                }
+            }
+
+            @Override
+            public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+                sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+            }
+        });
+    }
+
+    private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         // verify the shard with the specified name is present in the cluster configuration
-        if (!(this.configuration.isShardConfigured(shardName))) {
+        if (!this.configuration.isShardConfigured(shardName)) {
             String msg = String.format("No module configuration exists for shard %s", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
+            LOG.debug("{}: {}", persistenceId(), msg);
             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
             return;
         }
@@ -1086,33 +1332,70 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (schemaContext == null) {
             String msg = String.format(
                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
+            LOG.debug("{}: {}", persistenceId(), msg);
             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
             return;
         }
 
-        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+                getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor());
+                final RunnableMessage runnable = (RunnableMessage) () ->
+                    addShard(getShardName(), response, getSender());
+                if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+                    getSelf().tell(runnable, getTargetActor());
+                }
             }
 
             @Override
             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
-
         });
     }
 
     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
         String msg = String.format("Local shard %s already exists", shardName);
-        LOG.debug ("{}: {}", persistenceId(), msg);
+        LOG.debug("{}: {}", persistenceId(), msg);
         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
     }
 
+    private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
+                                final RemotePrimaryShardFound response, final ActorRef sender) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardInformation shardInfo;
+        final boolean removeShardOnFailure;
+        ShardInformation existingShardInfo = localShards.get(shardName);
+        if (existingShardInfo == null) {
+            removeShardOnFailure = true;
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+            final Builder builder = newShardDatastoreContextBuilder(shardName);
+            builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+            DatastoreContext datastoreContext = builder.build();
+
+            shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+                    Shard.builder(), peerAddressResolver);
+            shardInfo.setActiveMember(false);
+            localShards.put(shardName, shardInfo);
+            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+        } else {
+            removeShardOnFailure = false;
+            shardInfo = existingShardInfo;
+        }
+
+        execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+    }
+
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
-        if(isShardReplicaOperationInProgress(shardName, sender)) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
 
@@ -1121,12 +1404,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ShardInformation shardInfo;
         final boolean removeShardOnFailure;
         ShardInformation existingShardInfo = localShards.get(shardName);
-        if(existingShardInfo == null) {
+        if (existingShardInfo == null) {
             removeShardOnFailure = true;
             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
-                    DisableElectionsRaftPolicy.class.getName()).build();
+            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
+                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
 
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
@@ -1138,25 +1421,35 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInfo = existingShardInfo;
         }
 
-        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+        execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+    }
+
+    private void execAddShard(final String shardName,
+                              final ShardInformation shardInfo,
+                              final RemotePrimaryShardFound response,
+                              final boolean removeShardOnFailure,
+                              final ActorRef sender) {
+
+        final String localShardAddress =
+                peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
 
         //inform ShardLeader to add this shard as a replica by sending an AddServer message
-        LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+        LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
                 response.getPrimaryPath(), shardInfo.getShardId());
 
-        Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
-                duration());
-        Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
-            new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+        final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+                .getShardLeaderElectionTimeout().duration());
+        final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+                new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
                 if (failure != null) {
-                    LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+                    LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
-                    String msg = String.format("AddServer request to leader %s for shard %s failed",
+                    final String msg = String.format("AddServer request to leader %s for shard %s failed",
                             response.getPrimaryPath(), shardName);
                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
                 } else {
@@ -1171,7 +1464,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             boolean removeShardOnFailure) {
         shardReplicaOperationsInProgress.remove(shardName);
 
-        if(removeShardOnFailure) {
+        if (removeShardOnFailure) {
             ShardInformation shardInfo = localShards.remove(shardName);
             if (shardInfo.getActor() != null) {
                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
@@ -1187,10 +1480,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         String shardName = shardInfo.getShardName();
         shardReplicaOperationsInProgress.remove(shardName);
 
-        LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+        LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
-            LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+            LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
@@ -1198,13 +1491,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             persistShardList();
 
             sender.tell(new Status.Success(null), getSelf());
-        } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+        } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
             sendLocalReplicaAlreadyExistsReply(shardName, sender);
         } else {
-            LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+            LOG.warn("{}: Leader failed to add shard replica {} with status {}",
                     persistenceId(), shardName, replyMsg.getStatus());
 
-            Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
+            Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
+                    shardInfo.getShardId());
 
             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
         }
@@ -1216,9 +1510,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         switch (serverChangeStatus) {
             case TIMEOUT:
                 failure = new TimeoutException(String.format(
-                        "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
-                        "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
-                        leaderPath, shardId.getShardName()));
+                        "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
+                        + "Possible causes - there was a problem replicating the data or shard leadership changed "
+                        + "while replicating the shard data", leaderPath, shardId.getShardName()));
                 break;
             case NO_LEADER:
                 failure = createNoShardLeaderException(shardId);
@@ -1235,7 +1529,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return failure;
     }
 
-    private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+    private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
@@ -1251,7 +1545,34 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             private void doRemoveShardReplicaAsync(final String primaryPath) {
-                getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor());
+                getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
+                        primaryPath, getSender()), getTargetActor());
+            }
+        });
+    }
+
+    private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
+        LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
+
+        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                ClusterUtils.getCleanShardName(message.getShardPrefix()));
+        final String shardName = shardId.getShardName();
+
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
+                shardName, persistenceId(), getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            @Override
+            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+                doRemoveShardReplicaAsync(response.getPrimaryPath());
+            }
+
+            private void doRemoveShardReplicaAsync(final String primaryPath) {
+                getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
+                        primaryPath, getSender()), getTargetActor());
             }
         });
     }
@@ -1263,19 +1584,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 shardList.remove(shardInfo.getShardName());
             }
         }
-        LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
-        saveSnapshot(updateShardManagerSnapshot(shardList));
+        LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
+        saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
     }
 
-    private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
-        currentSnapshot = new ShardManagerSnapshot(shardList);
+    private ShardManagerSnapshot updateShardManagerSnapshot(
+            final List<String> shardList,
+            final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+        currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
         return currentSnapshot;
     }
 
     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
         currentSnapshot = snapshot;
 
-        LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+        LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
 
         final MemberName currentMember = cluster.getCurrentMemberName();
         Set<String> configuredShardList =
@@ -1283,7 +1606,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         for (String shard : currentSnapshot.getShardList()) {
             if (!configuredShardList.contains(shard)) {
                 // add the current member as a replica for the shard
-                LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+                LOG.debug("{}: adding shard {}", persistenceId(), shard);
                 configuration.addMemberReplicaForShard(shard, currentMember);
             } else {
                 configuredShardList.remove(shard);
@@ -1291,13 +1614,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
         for (String shard : configuredShardList) {
             // remove the member as a replica for the shard
-            LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+            LOG.debug("{}: removing shard {}", persistenceId(), shard);
             configuration.removeMemberReplicaForShard(shard, currentMember);
         }
     }
 
-    private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
-        LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+    private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
+        LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
             0, 0));
@@ -1308,7 +1631,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         String shardName = changeMembersVotingStatus.getShardName();
         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
-        for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+        for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
                     e.getValue());
         }
@@ -1316,8 +1639,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
 
         findLocalShard(shardName, getSender(),
-                localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
-                        localShardFound.getPath(), getSender()));
+            localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+            localShardFound.getPath(), getSender()));
     }
 
     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
@@ -1340,12 +1663,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                     OnDemandRaftState raftState = (OnDemandRaftState) response;
                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
-                    for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                    for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
                     }
 
-                    serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
-                            toString(), !raftState.isVoting());
+                    serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
+                            .toString(), !raftState.isVoting());
 
                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
                             shardName, localShardFound.getPath(), sender);
@@ -1355,30 +1678,49 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
+    private void findLocalShard(FindLocalShard message) {
+        LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
+
+        final ShardInformation shardInformation = localShards.get(message.getShardName());
+
+        if (shardInformation == null) {
+            LOG.debug("{}: Local shard {} not found - shards present: {}",
+                    persistenceId(), message.getShardName(), localShards.keySet());
+
+            getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+            return;
+        }
+
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
+            () -> new LocalShardFound(shardInformation.getActor()));
+    }
+
     private void findLocalShard(final String shardName, final ActorRef sender,
             final Consumer<LocalShardFound> onLocalShardFound) {
-        Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
-                getShardInitializationTimeout().duration().$times(2));
+        Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+                .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
-                    LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+                    LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
+                            failure);
                     sender.tell(new Status.Failure(new RuntimeException(
                             String.format("Failed to find local shard %s", shardName), failure)), self());
                 } else {
-                    if(response instanceof LocalShardFound) {
-                        getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender);
-                    } else if(response instanceof LocalShardNotFound) {
+                    if (response instanceof LocalShardFound) {
+                        getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
+                                sender);
+                    } else if (response instanceof LocalShardNotFound) {
                         String msg = String.format("Local shard %s does not exist", shardName);
-                        LOG.debug ("{}: {}", persistenceId, msg);
+                        LOG.debug("{}: {}", persistenceId, msg);
                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
                     } else {
                         String msg = String.format("Failed to find local shard %s: received response: %s",
                                 shardName, response);
-                        LOG.debug ("{}: {}", persistenceId, msg);
+                        LOG.debug("{}: {}", persistenceId, msg);
                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
                                 new RuntimeException(msg)), self());
                     }
@@ -1389,7 +1731,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
-        if(isShardReplicaOperationInProgress(shardName, sender)) {
+        if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
 
@@ -1411,21 +1753,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 if (failure != null) {
                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
                             shardActorRef.path());
-                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+                    LOG.debug("{}: {}", persistenceId(), msg, failure);
                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
                 } else {
-                    LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+                    LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
 
                     ServerChangeReply replyMsg = (ServerChangeReply) response;
-                    if(replyMsg.getStatus() == ServerChangeStatus.OK) {
-                        LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+                    if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+                        LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
                         sender.tell(new Status.Success(null), getSelf());
-                    } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+                    } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
-                                "The requested voting state change for shard %s is invalid. At least one member must be voting",
-                                shardId.getShardName()))), getSelf());
+                                "The requested voting state change for shard %s is invalid. At least one member "
+                                + "must be voting", shardId.getShardName()))), getSelf());
                     } else {
-                        LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+                        LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
                                 persistenceId(), shardName, replyMsg.getStatus());
 
                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
@@ -1494,83 +1836,63 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
-        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
-                getShardInitializationTimeout().duration().$times(2));
-
-
-        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(Throwable failure, Object response) {
-                if (failure != null) {
-                    handler.onFailure(failure);
-                } else {
-                    if(response instanceof RemotePrimaryShardFound) {
-                        handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
-                    } else if(response instanceof LocalPrimaryShardFound) {
-                        handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
-                    } else {
-                        handler.onUnknownResponse(response);
-                    }
-                }
-            }
-        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
-    }
-
-    private static interface RunnableMessage extends Runnable {
+    private interface RunnableMessage extends Runnable {
     }
 
     /**
      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
-     * a remote or local find primary message is processed
+     * a remote or local find primary message is processed.
      */
-    private static interface FindPrimaryResponseHandler {
+    private interface FindPrimaryResponseHandler {
         /**
-         * Invoked when a Failure message is received as a response
+         * Invoked when a Failure message is received as a response.
          *
-         * @param failure
+         * @param failure the failure exception
          */
         void onFailure(Throwable failure);
 
         /**
-         * Invoked when a RemotePrimaryShardFound response is received
+         * Invoked when a RemotePrimaryShardFound response is received.
          *
-         * @param response
+         * @param response the response
          */
         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
 
         /**
-         * Invoked when a LocalPrimaryShardFound response is received
-         * @param response
+         * Invoked when a LocalPrimaryShardFound response is received.
+         *
+         * @param response the response
          */
         void onLocalPrimaryFound(LocalPrimaryShardFound response);
 
         /**
          * Invoked when an unknown response is received. This is another type of failure.
          *
-         * @param response
+         * @param response the response
          */
         void onUnknownResponse(Object response);
     }
 
     /**
      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
-     * replica and sends a wrapped Failure response to some targetActor
+     * replica and sends a wrapped Failure response to some targetActor.
      */
-    private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+    private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
         private final ActorRef targetActor;
         private final String shardName;
         private final String persistenceId;
         private final ActorRef shardManagerActor;
 
         /**
+         * Constructs an instance.
+         *
          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
          * @param shardName The name of the shard for which the primary replica had to be found
          * @param persistenceId The persistenceId for the ShardManager
          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
          */
-        protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+        protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
+                ActorRef shardManagerActor) {
             this.targetActor = Preconditions.checkNotNull(targetActor);
             this.shardName = Preconditions.checkNotNull(shardName);
             this.persistenceId = Preconditions.checkNotNull(persistenceId);
@@ -1587,7 +1909,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         @Override
         public void onFailure(Throwable failure) {
-            LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+            LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
             targetActor.tell(new Status.Failure(new RuntimeException(
                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
         }
@@ -1596,7 +1918,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         public void onUnknownResponse(Object response) {
             String msg = String.format("Failed to find leader for shard %s: received response: %s",
                     shardName, response);
-            LOG.debug ("{}: {}", persistenceId, msg);
+            LOG.debug("{}: {}", persistenceId, msg);
             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
                     new RuntimeException(msg)), shardManagerActor);
         }