BUG 2817 : Handle ServerRemoved message in Shard/ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 98a6090514c9549f2f506c82a85fce7376e35cf6..616f56c466bbac02194460dca1d07b4a57b2569f 100644 (file)
@@ -36,6 +36,7 @@ import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -44,9 +45,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -56,7 +60,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
@@ -83,6 +86,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -134,6 +138,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreSnapshot restoreFromSnapshot;
 
+    private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+
+    private final String persistenceId;
+
     /**
      */
     protected ShardManager(Builder builder) {
@@ -148,6 +156,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.primaryShardInfoCache = builder.primaryShardInfoCache;
         this.restoreFromSnapshot = builder.restoreFromSnapshot;
 
+        String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
+        persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
+
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
 
         // Subscribe this actor to cluster member events
@@ -204,19 +215,44 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onCreateShard((CreateShard)message);
         } else if(message instanceof AddShardReplica){
             onAddShardReplica((AddShardReplica)message);
+        } else if(message instanceof ForwardedAddServerReply) {
+            ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+                    msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerFailure) {
+            ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+            onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
+            ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
+            addShard(msg.shardName, msg.primaryFound, getSender());
         } else if(message instanceof RemoveShardReplica){
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
+        } else if(message instanceof ServerRemoved){
+            onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof SaveSnapshotSuccess) {
-            LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+            LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
         } else if (message instanceof SaveSnapshotFailure) {
             LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
                 persistenceId(), ((SaveSnapshotFailure)message).cause());
         } else {
             unknownMessage(message);
         }
+    }
 
+    private void onShardReplicaRemoved(ServerRemoved message) {
+        final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+        final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+        if(shardInformation == null) {
+            LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+            return;
+        } else if(shardInformation.getActor() != null) {
+            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+        }
+        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+        persistShardList();
     }
 
     private void onGetSnapshot() {
@@ -252,47 +288,66 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onCreateShard(CreateShard createShard) {
         Object reply;
         try {
-            ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
-            if(localShards.containsKey(moduleShardConfig.getShardName())) {
-                throw new IllegalStateException(String.format("Shard with name %s already exists",
-                        moduleShardConfig.getShardName()));
+            String shardName = createShard.getModuleShardConfig().getShardName();
+            if(localShards.containsKey(shardName)) {
+                reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+            } else {
+                doCreateShard(createShard);
+                reply = new akka.actor.Status.Success(null);
             }
+        } catch (Exception e) {
+            LOG.error("onCreateShard failed", e);
+            reply = new akka.actor.Status.Failure(e);
+        }
 
-            configuration.addModuleShardConfiguration(moduleShardConfig);
+        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+            getSender().tell(reply, getSelf());
+        }
+    }
 
-            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
-            Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
-                    moduleShardConfig.getShardMemberNames()*/);
+    private void doCreateShard(CreateShard createShard) {
+        ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+        String shardName = moduleShardConfig.getShardName();
 
-            LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
-                    moduleShardConfig.getShardMemberNames(), peerAddresses);
+        configuration.addModuleShardConfiguration(moduleShardConfig);
 
-            DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
-            if(shardDatastoreContext == null) {
-                shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
-            } else {
-                shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
-                        peerAddressResolver).build();
-            }
+        DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+        if(shardDatastoreContext == null) {
+            shardDatastoreContext = newShardDatastoreContext(shardName);
+        } else {
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+                    peerAddressResolver).build();
+        }
 
-            ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
-                    shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
-            localShards.put(info.getShardName(), info);
+        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-            mBean.addLocalShard(shardId.toString());
+        Map<String, String> peerAddresses;
+        boolean isActiveMember;
+        if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+            peerAddresses = getPeerAddresses(shardName);
+            isActiveMember = true;
+        } else {
+            // The local member is not in the given shard member configuration. In this case we'll create
+            // the shard with no peers and with elections disabled so it stays as follower. A
+            // subsequent AddServer request will be needed to make it an active member.
+            isActiveMember = false;
+            peerAddresses = Collections.emptyMap();
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
+                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+        }
 
-            if(schemaContext != null) {
-                info.setActor(newShardActor(schemaContext, info));
-            }
+        LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+                moduleShardConfig.getShardMemberNames(), peerAddresses);
 
-            reply = new CreateShardReply();
-        } catch (Exception e) {
-            LOG.error("onCreateShard failed", e);
-            reply = new akka.actor.Status.Failure(e);
-        }
+        ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+                shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
+        info.setActiveMember(isActiveMember);
+        localShards.put(info.getShardName(), info);
 
-        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
-            getSender().tell(reply, getSelf());
+        mBean.addLocalShard(shardId.toString());
+
+        if(schemaContext != null) {
+            info.setActor(newShardActor(schemaContext, info));
         }
     }
 
@@ -677,7 +732,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
-        if (info != null) {
+        if (info != null && info.isActiveMember()) {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
@@ -791,7 +846,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public String persistenceId() {
-        return "shard-manager-" + type;
+        return persistenceId;
     }
 
     @VisibleForTesting
@@ -799,21 +854,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
-    private void checkLocalShardExists(final String shardName, final ActorRef sender) {
-        if (localShards.containsKey(shardName)) {
-            String msg = String.format("Local shard %s already exists", shardName);
+    private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
+        if (shardReplicaOperationsInProgress.contains(shardName)) {
+            String msg = String.format("A shard replica operation for %s is already in progress", shardName);
             LOG.debug ("{}: {}", persistenceId(), msg);
-            sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            return true;
         }
+
+        return false;
     }
 
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
-        // verify the local shard replica is already available in the controller node
-        LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
-
-        checkLocalShardExists(shardName, getSender());
+        LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         // verify the shard with the specified name is present in the cluster configuration
         if (!(this.configuration.isShardConfigured(shardName))) {
@@ -832,66 +887,79 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        Map<String, String> peerAddresses = getPeerAddresses(shardName);
-        if (peerAddresses.isEmpty()) {
-            String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
-            return;
-        }
-
         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
                 getShardInitializationTimeout().duration().$times(2));
 
         final ActorRef sender = getSender();
-        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
+        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
                     LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("Failed to find leader for shard %s", shardName), failure)),
-                        getSelf());
+                        String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
                 } else {
-                    if (!(response instanceof RemotePrimaryShardFound)) {
+                    if(response instanceof RemotePrimaryShardFound) {
+                        self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
+                                (RemotePrimaryShardFound)response), sender);
+                    } else if(response instanceof LocalPrimaryShardFound) {
+                        sendLocalReplicaAlreadyExistsReply(shardName, sender);
+                    } else {
                         String msg = String.format("Failed to find leader for shard %s: received response: %s",
                                 shardName, response);
                         LOG.debug ("{}: {}", persistenceId(), msg);
-                        sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
-                        return;
+                        sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
+                            new RuntimeException(msg)), getSelf());
                     }
-
-                    RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
-                    addShard (shardName, message, sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+        String msg = String.format("Local shard %s already exists", shardName);
+        LOG.debug ("{}: {}", persistenceId(), msg);
+        sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+    }
+
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
-        checkLocalShardExists(shardName, sender);
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
 
-        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
-        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+        shardReplicaOperationsInProgress.add(shardName);
 
-        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
-                DisableElectionsRaftPolicy.class.getName()).build();
+        final ShardInformation shardInfo;
+        final boolean removeShardOnFailure;
+        ShardInformation existingShardInfo = localShards.get(shardName);
+        if(existingShardInfo == null) {
+            removeShardOnFailure = true;
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-        final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
-                          getPeerAddresses(shardName), datastoreContext,
-                          Shard.builder(), peerAddressResolver);
-        shardInfo.setShardActiveMember(false);
-        localShards.put(shardName, shardInfo);
-        shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+                    DisableElectionsRaftPolicy.class.getName()).build();
+
+            shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+                    Shard.builder(), peerAddressResolver);
+            shardInfo.setActiveMember(false);
+            localShards.put(shardName, shardInfo);
+            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+        } else {
+            removeShardOnFailure = false;
+            shardInfo = existingShardInfo;
+        }
+
+        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
 
         //inform ShardLeader to add this shard as a replica by sending an AddServer message
         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
-                response.getPrimaryPath(), shardId);
+                response.getPrimaryPath(), shardInfo.getShardId());
 
-        Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
+        Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
+                duration());
         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
-            new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+            new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
@@ -900,27 +968,37 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
-                    // Remove the shard
-                    localShards.remove(shardName);
-                    if (shardInfo.getActor() != null) {
-                        shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
-                    }
-
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("AddServer request to leader %s for shard %s failed",
-                            response.getPrimaryPath(), shardName), failure)), getSelf());
+                    String msg = String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName);
+                    self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
                 } else {
-                    AddServerReply reply = (AddServerReply)addServerResponse;
-                    onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+                    self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+                            response.getPrimaryPath(), removeShardOnFailure), sender);
                 }
             }
-        }, new Dispatchers(context().system().dispatchers()).
-            getDispatcher(Dispatchers.DispatcherType.Client));
-        return;
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+            boolean removeShardOnFailure) {
+        shardReplicaOperationsInProgress.remove(shardName);
+
+        if(removeShardOnFailure) {
+            ShardInformation shardInfo = localShards.remove(shardName);
+            if (shardInfo.getActor() != null) {
+                shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+            }
+        }
+
+        sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+            new RuntimeException(message, failure)), getSelf());
     }
 
-    private void onAddServerReply (String shardName, ShardInformation shardInfo,
-                                   AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+    private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+            String leaderPath, boolean removeShardOnFailure) {
+        String shardName = shardInfo.getShardName();
+        shardReplicaOperationsInProgress.remove(shardName);
+
         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
@@ -928,35 +1006,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
-            shardInfo.setShardActiveMember(true);
+            shardInfo.setActiveMember(true);
             persistShardList();
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
-            sender.tell(new akka.actor.Status.Success(true), getSelf());
+            sender.tell(new akka.actor.Status.Success(null), getSelf());
+        } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+            sendLocalReplicaAlreadyExistsReply(shardName, sender);
         } else {
-            LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+            LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
                     persistenceId(), shardName, replyMsg.getStatus());
 
-            //remove the local replica created
-            localShards.remove(shardName);
-            if (shardInfo.getActor() != null) {
-                shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
-            }
+            Exception failure;
             switch (replyMsg.getStatus()) {
                 case TIMEOUT:
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
-                            leaderPath, shardName))), getSelf());
+                    failure = new TimeoutException(String.format(
+                            "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+                            "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                            leaderPath, shardName));
                     break;
                 case NO_LEADER:
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
-                        "There is no shard leader available for shard %s", shardName))), getSelf());
+                    failure = createNoShardLeaderException(shardInfo.getShardId());
                     break;
                 default :
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
-                        "AddServer request to leader %s for shard %s failed with status %s",
-                        leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+                    failure = new RuntimeException(String.format(
+                            "AddServer request to leader %s for shard %s failed with status %s",
+                            leaderPath, shardName, replyMsg.getStatus()));
             }
+
+            onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
         }
     }
 
@@ -976,9 +1054,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void persistShardList() {
-        List<String> shardList = new ArrayList(localShards.keySet());
+        List<String> shardList = new ArrayList<>(localShards.keySet());
         for (ShardInformation shardInfo : localShards.values()) {
-            if (!shardInfo.isShardActiveMember()) {
+            if (!shardInfo.isActiveMember()) {
                 shardList.remove(shardInfo.getShardName());
             }
         }
@@ -1008,6 +1086,46 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private static class ForwardedAddServerPrimaryShardFound {
+        String shardName;
+        RemotePrimaryShardFound primaryFound;
+
+        ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+            this.shardName = shardName;
+            this.primaryFound = primaryFound;
+        }
+    }
+
+    private static class ForwardedAddServerReply {
+        ShardInformation shardInfo;
+        AddServerReply addServerReply;
+        String leaderPath;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+                boolean removeShardOnFailure) {
+            this.shardInfo = shardInfo;
+            this.addServerReply = addServerReply;
+            this.leaderPath = leaderPath;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
+    private static class ForwardedAddServerFailure {
+        String shardName;
+        String failureMessage;
+        Throwable failure;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+                boolean removeShardOnFailure) {
+            this.shardName = shardName;
+            this.failureMessage = failureMessage;
+            this.failure = failure;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
@@ -1031,7 +1149,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private DatastoreContext datastoreContext;
         private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
-        private boolean shardActiveStatus = true;
+        private boolean isActiveMember = true;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
@@ -1056,6 +1174,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardName;
         }
 
+        @Nullable
         ActorRef getActor(){
             return actor;
         }
@@ -1231,12 +1350,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             this.leaderVersion = leaderVersion;
         }
 
-        void setShardActiveMember(boolean flag) {
-            shardActiveStatus = flag;
+        boolean isActiveMember() {
+            return isActiveMember;
         }
 
-        boolean isShardActiveMember() {
-            return shardActiveStatus;
+        void setActiveMember(boolean isActiveMember) {
+            this.isActiveMember = isActiveMember;
         }
     }