BUG 2817 : Handle ServerRemoved message in Shard/ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index a878f6decbf0cbe51a8c34176fc2a58e522d74c0..616f56c466bbac02194460dca1d07b4a57b2569f 100644 (file)
@@ -19,9 +19,11 @@ import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.OnComplete;
-import akka.japi.Creator;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
 import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
@@ -34,16 +36,21 @@ import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -51,8 +58,9 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
@@ -63,11 +71,10 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
-import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -75,9 +82,11 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -115,9 +124,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfo mBean;
+    private final ShardManagerInfo mBean;
 
-    private DatastoreContext datastoreContext;
+    private DatastoreContextFactory datastoreContextFactory;
 
     private final CountDownLatch waitTillReadyCountdownLatch;
 
@@ -127,45 +136,40 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private SchemaContext schemaContext;
 
+    private DatastoreSnapshot restoreFromSnapshot;
+
+    private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+
+    private final String persistenceId;
+
     /**
      */
-    protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
-            PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
-        this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
-        this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.datastoreContext = datastoreContext;
-        this.type = datastoreContext.getDataStoreType();
+    protected ShardManager(Builder builder) {
+
+        this.cluster = builder.cluster;
+        this.configuration = builder.configuration;
+        this.datastoreContextFactory = builder.datastoreContextFactory;
+        this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-        this.primaryShardInfoCache = primaryShardInfoCache;
+        this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = builder.primaryShardInfoCache;
+        this.restoreFromSnapshot = builder.restoreFromSnapshot;
+
+        String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
+        persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
 
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
-        this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
-                peerAddressResolver).build();
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        createLocalShards();
-    }
-
-    public static Props props(
-            final ClusterWrapper cluster,
-            final Configuration configuration,
-            final DatastoreContext datastoreContext,
-            final CountDownLatch waitTillReadyCountdownLatch,
-            final PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
-        Preconditions.checkNotNull(cluster, "cluster should not be null");
-        Preconditions.checkNotNull(configuration, "configuration should not be null");
-        Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
-        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
-
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
-                waitTillReadyCountdownLatch, primaryShardInfoCache));
+        List<String> localShardActorNames = new ArrayList<>();
+        mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+                "shard-manager-" + this.type,
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
+                localShardActorNames);
+        mBean.setShardManager(this);
     }
 
     @Override
@@ -195,8 +199,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             memberUnreachable((ClusterEvent.UnreachableMember)message);
         } else if(message instanceof ClusterEvent.ReachableMember) {
             memberReachable((ClusterEvent.ReachableMember) message);
-        } else if(message instanceof DatastoreContext) {
-            onDatastoreContext((DatastoreContext)message);
+        } else if(message instanceof DatastoreContextFactory) {
+            onDatastoreContextFactory((DatastoreContextFactory)message);
         } else if(message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
         } else if(message instanceof FollowerInitialSyncUpStatus){
@@ -211,51 +215,86 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onCreateShard((CreateShard)message);
         } else if(message instanceof AddShardReplica){
             onAddShardReplica((AddShardReplica)message);
+        } else if(message instanceof ForwardedAddServerReply) {
+            ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+                    msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerFailure) {
+            ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+            onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
+            ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
+            addShard(msg.shardName, msg.primaryFound, getSender());
         } else if(message instanceof RemoveShardReplica){
             onRemoveShardReplica((RemoveShardReplica)message);
+        } else if(message instanceof GetSnapshot) {
+            onGetSnapshot();
+        } else if(message instanceof ServerRemoved){
+            onShardReplicaRemoved((ServerRemoved) message);
+        } else if (message instanceof SaveSnapshotSuccess) {
+            LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
+        } else if (message instanceof SaveSnapshotFailure) {
+            LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
+                persistenceId(), ((SaveSnapshotFailure)message).cause());
         } else {
             unknownMessage(message);
         }
-
     }
 
-    private void onCreateShard(CreateShard createShard) {
-        Object reply;
-        try {
-            ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
-            if(localShards.containsKey(moduleShardConfig.getShardName())) {
-                throw new IllegalStateException(String.format("Shard with name %s already exists",
-                        moduleShardConfig.getShardName()));
-            }
-
-            configuration.addModuleShardConfiguration(moduleShardConfig);
+    private void onShardReplicaRemoved(ServerRemoved message) {
+        final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+        final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+        if(shardInformation == null) {
+            LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+            return;
+        } else if(shardInformation.getActor() != null) {
+            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+        }
+        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+        persistShardList();
+    }
 
-            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
-            Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
-                    moduleShardConfig.getShardMemberNames()*/);
+    private void onGetSnapshot() {
+        LOG.debug("{}: onGetSnapshot", persistenceId());
 
-            LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
-                    moduleShardConfig.getShardMemberNames(), peerAddresses);
+        List<String> notInitialized = null;
+        for(ShardInformation shardInfo: localShards.values()) {
+            if(!shardInfo.isShardInitialized()) {
+                if(notInitialized == null) {
+                    notInitialized = new ArrayList<>();
+                }
 
-            DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
-            if(shardDatastoreContext == null) {
-                shardDatastoreContext = datastoreContext;
-            } else {
-                shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
-                        peerAddressResolver).build();
+                notInitialized.add(shardInfo.getShardName());
             }
+        }
 
-            ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
-                    shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
-            localShards.put(info.getShardName(), info);
+        if(notInitialized != null) {
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+                    "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
+            return;
+        }
 
-            mBean.addLocalShard(shardId.toString());
+        byte[] shardManagerSnapshot = null;
+        ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
+                new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+                datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
-            if(schemaContext != null) {
-                info.setActor(newShardActor(schemaContext, info));
-            }
+        for(ShardInformation shardInfo: localShards.values()) {
+            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+        }
+    }
 
-            reply = new CreateShardReply();
+    private void onCreateShard(CreateShard createShard) {
+        Object reply;
+        try {
+            String shardName = createShard.getModuleShardConfig().getShardName();
+            if(localShards.containsKey(shardName)) {
+                reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+            } else {
+                doCreateShard(createShard);
+                reply = new akka.actor.Status.Success(null);
+            }
         } catch (Exception e) {
             LOG.error("onCreateShard failed", e);
             reply = new akka.actor.Status.Failure(e);
@@ -266,6 +305,61 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void doCreateShard(CreateShard createShard) {
+        ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+        String shardName = moduleShardConfig.getShardName();
+
+        configuration.addModuleShardConfiguration(moduleShardConfig);
+
+        DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+        if(shardDatastoreContext == null) {
+            shardDatastoreContext = newShardDatastoreContext(shardName);
+        } else {
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+                    peerAddressResolver).build();
+        }
+
+        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+        Map<String, String> peerAddresses;
+        boolean isActiveMember;
+        if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+            peerAddresses = getPeerAddresses(shardName);
+            isActiveMember = true;
+        } else {
+            // The local member is not in the given shard member configuration. In this case we'll create
+            // the shard with no peers and with elections disabled so it stays as follower. A
+            // subsequent AddServer request will be needed to make it an active member.
+            isActiveMember = false;
+            peerAddresses = Collections.emptyMap();
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
+                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+        }
+
+        LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+                moduleShardConfig.getShardMemberNames(), peerAddresses);
+
+        ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+                shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
+        info.setActiveMember(isActiveMember);
+        localShards.put(info.getShardName(), info);
+
+        mBean.addLocalShard(shardId.toString());
+
+        if(schemaContext != null) {
+            info.setActor(newShardActor(schemaContext, info));
+        }
+    }
+
+    private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
+                shardPeerAddressResolver(peerAddressResolver);
+    }
+
+    private DatastoreContext newShardDatastoreContext(String shardName) {
+        return newShardDatastoreContextBuilder(shardName).build();
+    }
+
     private void checkReady(){
         if (isReadyWithLeaderId()) {
             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
@@ -403,6 +497,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
             // journal on upgrade from Helium.
             deleteMessages(lastSequenceNr());
+            createLocalShards();
+        } else if (message instanceof SnapshotOffer) {
+            handleShardRecovery((SnapshotOffer) message);
         }
     }
 
@@ -441,16 +538,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
-                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
-
-                FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+                FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
                 if(shardInformation.isShardInitialized()) {
                     // If the shard is already initialized then we'll wait enough time for the shard to
                     // elect a leader, ie 2 times the election timeout.
-                    timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+                    timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
                 }
 
+                LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
+                        shardInformation.getShardName());
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         timeout, getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
@@ -574,13 +672,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onDatastoreContext(DatastoreContext context) {
-        datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
-                peerAddressResolver).build();
+    private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+        datastoreContextFactory = factory;
         for (ShardInformation info : localShards.values()) {
-            if (info.getActor() != null) {
-                info.getActor().tell(datastoreContext, getSelf());
-            }
+            info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
         }
     }
 
@@ -637,7 +732,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
-        if (info != null) {
+        if (info != null && info.isActiveMember()) {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
@@ -693,20 +788,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         String memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
-        ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
-        List<String> localShardActorNames = new ArrayList<>();
+        Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
+        if(restoreFromSnapshot != null)
+        {
+            for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+                shardSnapshots.put(snapshot.getName(), snapshot);
+            }
+        }
+
+        restoreFromSnapshot = null; // null out to GC
+
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShardActorNames.add(shardId.toString());
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
-                    shardPropsCreator, peerAddressResolver));
+            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
+                    newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
+                        shardSnapshots.get(shardName)), peerAddressResolver));
+            mBean.addLocalShard(shardId.toString());
         }
-
-        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
-
-        mBean.setShardManager(this);
     }
 
     /**
@@ -747,7 +846,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public String persistenceId() {
-        return "shard-manager-" + type;
+        return persistenceId;
     }
 
     @VisibleForTesting
@@ -755,181 +854,198 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
-    private DatastoreContext getInitShardDataStoreContext() {
-        return (DatastoreContext.newBuilderFrom(datastoreContext)
-                .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
-                .build());
+    private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
+        if (shardReplicaOperationsInProgress.contains(shardName)) {
+            String msg = String.format("A shard replica operation for %s is already in progress", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            return true;
+        }
+
+        return false;
     }
 
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
-        // verify the local shard replica is already available in the controller node
-        LOG.debug ("received AddShardReplica for shard {}", shardName);
-        if (localShards.containsKey(shardName)) {
-            LOG.debug ("Local shard {} already available in the controller node", shardName);
-            getSender().tell(new akka.actor.Status.Failure(
-                   new IllegalArgumentException(String.format("Local shard %s already exists",
-                                                 shardName))), getSelf());
-            return;
-        }
+        LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
+
         // verify the shard with the specified name is present in the cluster configuration
         if (!(this.configuration.isShardConfigured(shardName))) {
-            LOG.debug ("No module configuration exists for shard {}", shardName);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(
-                        String.format("No module configuration exists for shard %s",
-                                       shardName))), getSelf());
+            String msg = String.format("No module configuration exists for shard %s", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
             return;
         }
 
         // Create the localShard
         if (schemaContext == null) {
-            LOG.debug ("schemaContext is not updated to create localShardActor");
-            getSender().tell(new akka.actor.Status.Failure(
-               new IllegalStateException(String.format(
-                  "schemaContext not available to create localShardActor for %s",
-                   shardName))), getSelf());
-            return;
-        }
-
-        Map<String, String> peerAddresses = getPeerAddresses(shardName);
-        if (peerAddresses.isEmpty()) {
-            LOG.debug ("Shard peers not available for replicating shard data from leader");
-            getSender().tell(new akka.actor.Status.Failure(
-                new IllegalStateException(String.format(
-                    "Cannot add replica for shard %s because no peer is available",
-                    shardName))), getSelf());
+            String msg = String.format(
+                  "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
             return;
         }
 
-        Timeout findPrimaryTimeout = new Timeout(datastoreContext
-                       .getShardInitializationTimeout().duration().$times(2));
+        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
 
         final ActorRef sender = getSender();
-        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true),
-                                       findPrimaryTimeout);
+        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
-                    LOG.debug ("Failed to receive response for FindPrimary of shard {}",
-                                shardName, failure);
+                    LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("Failed to find leader for shard %s", shardName), failure)),
-                        getSelf());
+                        String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
                 } else {
-                    if (!(response instanceof RemotePrimaryShardFound)) {
-                        LOG.debug ("Shard leader not available for creating local shard replica {}",
-                            shardName);
-                        sender.tell(new akka.actor.Status.Failure(
-                            new IllegalStateException(String.format(
-                                "Invalid response type, %s, received from FindPrimary for shard %s",
-                                response.getClass().getName(), shardName))), getSelf());
-                        return;
+                    if(response instanceof RemotePrimaryShardFound) {
+                        self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
+                                (RemotePrimaryShardFound)response), sender);
+                    } else if(response instanceof LocalPrimaryShardFound) {
+                        sendLocalReplicaAlreadyExistsReply(shardName, sender);
+                    } else {
+                        String msg = String.format("Failed to find leader for shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug ("{}: {}", persistenceId(), msg);
+                        sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
+                            new RuntimeException(msg)), getSelf());
                     }
-                    RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
-                    addShard (shardName, message, sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void addShard(final String shardName, final RemotePrimaryShardFound response,
-                          final ActorRef sender) {
-        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
-                                                     shardName);
-        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName,
-            cluster.getCurrentMemberName());
-        final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
-                          getPeerAddresses(shardName), getInitShardDataStoreContext(),
-                          new DefaultShardPropsCreator(), peerAddressResolver);
-        localShards.put(shardName, shardInfo);
-        shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+    private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+        String msg = String.format("Local shard %s already exists", shardName);
+        LOG.debug ("{}: {}", persistenceId(), msg);
+        sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+    }
+
+    private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardInformation shardInfo;
+        final boolean removeShardOnFailure;
+        ShardInformation existingShardInfo = localShards.get(shardName);
+        if(existingShardInfo == null) {
+            removeShardOnFailure = true;
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+                    DisableElectionsRaftPolicy.class.getName()).build();
+
+            shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+                    Shard.builder(), peerAddressResolver);
+            shardInfo.setActiveMember(false);
+            localShards.put(shardName, shardInfo);
+            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+        } else {
+            removeShardOnFailure = false;
+            shardInfo = existingShardInfo;
+        }
+
+        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
 
         //inform ShardLeader to add this shard as a replica by sending an AddServer message
-        LOG.debug ("sending AddServer message to peer {} for shard {}",
-            response.getPrimaryPath(), shardId);
+        LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+                response.getPrimaryPath(), shardInfo.getShardId());
 
-        Timeout addServerTimeout = new Timeout(datastoreContext
-                       .getShardLeaderElectionTimeout().duration().$times(4));
+        Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
+                duration());
         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
-            new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+            new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
                 if (failure != null) {
-                    LOG.debug ("AddServer request to {} for {} failed",
-                        response.getPrimaryPath(), shardName, failure);
-                    // Remove the shard
-                    localShards.remove(shardName);
-                    if (shardInfo.getActor() != null) {
-                        shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
-                    }
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("AddServer request to leader %s for shard %s failed",
-                            response.getPrimaryPath(), shardName), failure)), getSelf());
+                    LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+                            response.getPrimaryPath(), shardName, failure);
+
+                    String msg = String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName);
+                    self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
                 } else {
-                    AddServerReply reply = (AddServerReply)addServerResponse;
-                    onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+                    self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+                            response.getPrimaryPath(), removeShardOnFailure), sender);
                 }
             }
-        }, new Dispatchers(context().system().dispatchers()).
-            getDispatcher(Dispatchers.DispatcherType.Client));
-        return;
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void onAddServerReply (String shardName, ShardInformation shardInfo,
-                                   AddServerReply replyMsg, ActorRef sender, String leaderPath) {
-        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
-            LOG.debug ("Leader shard successfully added the replica shard {}",
-                        shardName);
-            // Make the local shard voting capable
-            shardInfo.setDatastoreContext(datastoreContext, getSelf());
-            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
-                                                         shardName);
-            mBean.addLocalShard(shardId.toString());
-            sender.tell(new akka.actor.Status.Success(true), getSelf());
-        } else {
-            LOG.warn ("Cannot add shard replica {} status {}",
-                      shardName, replyMsg.getStatus());
-            LOG.debug ("removing the local shard replica for shard {}",
-                       shardName);
-            //remove the local replica created
-            localShards.remove(shardName);
+    private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+            boolean removeShardOnFailure) {
+        shardReplicaOperationsInProgress.remove(shardName);
+
+        if(removeShardOnFailure) {
+            ShardInformation shardInfo = localShards.remove(shardName);
             if (shardInfo.getActor() != null) {
                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
             }
+        }
+
+        sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+            new RuntimeException(message, failure)), getSelf());
+    }
+
+    private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+            String leaderPath, boolean removeShardOnFailure) {
+        String shardName = shardInfo.getShardName();
+        shardReplicaOperationsInProgress.remove(shardName);
+
+        LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
+            // Make the local shard voting capable
+            shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+            shardInfo.setActiveMember(true);
+            persistShardList();
+
+            mBean.addLocalShard(shardInfo.getShardId().toString());
+            sender.tell(new akka.actor.Status.Success(null), getSelf());
+        } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+            sendLocalReplicaAlreadyExistsReply(shardName, sender);
+        } else {
+            LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+                    persistenceId(), shardName, replyMsg.getStatus());
+
+            Exception failure;
             switch (replyMsg.getStatus()) {
-                //case ServerChangeStatus.TIMEOUT:
                 case TIMEOUT:
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
-                            leaderPath, shardName))), getSelf());
+                    failure = new TimeoutException(String.format(
+                            "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+                            "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                            leaderPath, shardName));
                     break;
-                //case ServerChangeStatus.NO_LEADER:
                 case NO_LEADER:
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
-                        "There is no shard leader available for shard %s", shardName))), getSelf());
+                    failure = createNoShardLeaderException(shardInfo.getShardId());
                     break;
                 default :
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
-                        "AddServer request to leader %s for shard %s failed with status %s",
-                        leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+                    failure = new RuntimeException(String.format(
+                            "AddServer request to leader %s for shard %s failed with status %s",
+                            leaderPath, shardName, replyMsg.getStatus()));
             }
+
+            onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
         }
     }
 
     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
         String shardName = shardReplicaMsg.getShardName();
-        boolean deleteStatus = false;
 
         // verify the local shard replica is available in the controller node
         if (!localShards.containsKey(shardName)) {
-            LOG.debug ("Local shard replica {} is not available in the controller node", shardName);
-            getSender().tell(new akka.actor.Status.Failure(
-                   new IllegalArgumentException(String.format("Local shard %s not available",
-                                                shardName))), getSelf());
+            String msg = String.format("Local shard %s does not", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
             return;
         }
         // call RemoveShard for the shardName
@@ -937,6 +1053,79 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return;
     }
 
+    private void persistShardList() {
+        List<String> shardList = new ArrayList<>(localShards.keySet());
+        for (ShardInformation shardInfo : localShards.values()) {
+            if (!shardInfo.isActiveMember()) {
+                shardList.remove(shardInfo.getShardName());
+            }
+        }
+        LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+        saveSnapshot(new ShardManagerSnapshot(shardList));
+    }
+
+    private void handleShardRecovery(SnapshotOffer offer) {
+        LOG.debug ("{}: in handleShardRecovery", persistenceId());
+        ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+        String currentMember = cluster.getCurrentMemberName();
+        Set<String> configuredShardList =
+            new HashSet<>(configuration.getMemberShardNames(currentMember));
+        for (String shard : snapshot.getShardList()) {
+            if (!configuredShardList.contains(shard)) {
+                // add the current member as a replica for the shard
+                LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+                configuration.addMemberReplicaForShard(shard, currentMember);
+            } else {
+                configuredShardList.remove(shard);
+            }
+        }
+        for (String shard : configuredShardList) {
+            // remove the member as a replica for the shard
+            LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+            configuration.removeMemberReplicaForShard(shard, currentMember);
+        }
+    }
+
+    private static class ForwardedAddServerPrimaryShardFound {
+        String shardName;
+        RemotePrimaryShardFound primaryFound;
+
+        ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+            this.shardName = shardName;
+            this.primaryFound = primaryFound;
+        }
+    }
+
+    private static class ForwardedAddServerReply {
+        ShardInformation shardInfo;
+        AddServerReply addServerReply;
+        String leaderPath;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+                boolean removeShardOnFailure) {
+            this.shardInfo = shardInfo;
+            this.addServerReply = addServerReply;
+            this.leaderPath = leaderPath;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
+    private static class ForwardedAddServerFailure {
+        String shardName;
+        String failureMessage;
+        Throwable failure;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+                boolean removeShardOnFailure) {
+            this.shardName = shardName;
+            this.failureMessage = failureMessage;
+            this.failure = failure;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
@@ -958,28 +1147,34 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private short leaderVersion;
 
         private DatastoreContext datastoreContext;
-        private final ShardPropsCreator shardPropsCreator;
+        private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
+        private boolean isActiveMember = true;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
-                ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
+                Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
             this.shardName = shardName;
             this.shardId = shardId;
             this.initialPeerAddresses = initialPeerAddresses;
             this.datastoreContext = datastoreContext;
-            this.shardPropsCreator = shardPropsCreator;
+            this.builder = builder;
             this.addressResolver = addressResolver;
         }
 
         Props newProps(SchemaContext schemaContext) {
-            return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
+            Preconditions.checkNotNull(builder);
+            Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
+                    schemaContext(schemaContext).props();
+            builder = null;
+            return props;
         }
 
         String getShardName() {
             return shardName;
         }
 
+        @Nullable
         ActorRef getActor(){
             return actor;
         }
@@ -1005,6 +1200,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return localShardDataTree;
         }
 
+        DatastoreContext getDatastoreContext() {
+            return datastoreContext;
+        }
+
+        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+            this.datastoreContext = datastoreContext;
+            if (actor != null) {
+                LOG.debug ("Sending new DatastoreContext to {}", shardId);
+                actor.tell(this.datastoreContext, sender);
+            }
+        }
+
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
@@ -1143,39 +1350,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             this.leaderVersion = leaderVersion;
         }
 
-        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
-            this.datastoreContext = datastoreContext;
-            //notify the datastoreContextchange
-            LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
-                 this.shardName);
-            if (actor != null) {
-                actor.tell(this.datastoreContext, sender);
-            }
-        }
-    }
-
-    private static class ShardManagerCreator implements Creator<ShardManager> {
-        private static final long serialVersionUID = 1L;
-
-        final ClusterWrapper cluster;
-        final Configuration configuration;
-        final DatastoreContext datastoreContext;
-        private final CountDownLatch waitTillReadyCountdownLatch;
-        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
-
-        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
-                CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
-            this.cluster = cluster;
-            this.configuration = configuration;
-            this.datastoreContext = datastoreContext;
-            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-            this.primaryShardInfoCache = primaryShardInfoCache;
+        boolean isActiveMember() {
+            return isActiveMember;
         }
 
-        @Override
-        public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
-                    primaryShardInfoCache);
+        void setActiveMember(boolean isActiveMember) {
+            this.isActiveMember = isActiveMember;
         }
     }
 
@@ -1248,6 +1428,74 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return modules;
         }
     }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private ClusterWrapper cluster;
+        private Configuration configuration;
+        private DatastoreContextFactory datastoreContextFactory;
+        private CountDownLatch waitTillReadyCountdownLatch;
+        private PrimaryShardInfoFutureCache primaryShardInfoCache;
+        private DatastoreSnapshot restoreFromSnapshot;
+        private volatile boolean sealed;
+
+        protected void checkSealed() {
+            Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+        }
+
+        public Builder cluster(ClusterWrapper cluster) {
+            checkSealed();
+            this.cluster = cluster;
+            return this;
+        }
+
+        public Builder configuration(Configuration configuration) {
+            checkSealed();
+            this.configuration = configuration;
+            return this;
+        }
+
+        public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+            checkSealed();
+            this.datastoreContextFactory = datastoreContextFactory;
+            return this;
+        }
+
+        public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+            checkSealed();
+            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            return this;
+        }
+
+        public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            checkSealed();
+            this.primaryShardInfoCache = primaryShardInfoCache;
+            return this;
+        }
+
+        public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+            checkSealed();
+            this.restoreFromSnapshot = restoreFromSnapshot;
+            return this;
+        }
+
+        protected void verify() {
+            sealed = true;
+            Preconditions.checkNotNull(cluster, "cluster should not be null");
+            Preconditions.checkNotNull(configuration, "configuration should not be null");
+            Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+            Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+            Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+        }
+
+        public Props props() {
+            verify();
+            return Props.create(ShardManager.class, this);
+        }
+    }
 }