BUG 2187 - Creating ShardReplica 96/28596/9
authorkalaiselvik <Kalaiselvi_K@Dell.com>
Wed, 28 Oct 2015 12:31:57 +0000 (18:01 +0530)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 29 Oct 2015 18:25:18 +0000 (18:25 +0000)
Creating local shard replica with a custom Raftpolicy. Informs Shard leader of the local shard.
Processes AddServerReply from shard leader.
On successful replication, makes local shard voting capable.
On replication failure, local shard is removed.

Incorporated the comments

Change-Id: Id2b90039c39211b20322bc2d141520723d44c391
Signed-off-by: kalaiselvik <Kalaiselvi_K@Dell.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java [moved from opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java with 77% similarity]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index 3645f615e68cc3f8e8a4958f12a7ce1543f636fd..c63deae7171904638c3315798b2d5332ae85a442 100644 (file)
@@ -98,4 +98,10 @@ public interface ConfigParams {
      * Returns the PeerAddressResolver.
      */
     @Nonnull PeerAddressResolver getPeerAddressResolver();
+
+    /**
+     * @return the RaftPolicy class used by this configuration
+     */
+    String getCustomRaftPolicyImplementationClass();
+
 }
index c64c295ceaefe19333f5c49e144a7899d5b5ca1e..46949da17e445d3e63345b47b1f7a74de1baf7c3 100644 (file)
@@ -103,6 +103,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
     }
 
+    @Override
     public String getCustomRaftPolicyImplementationClass() {
         return customRaftPolicyImplementationClass;
     }
index efcf27da20283bda77cb6b2ac2fa8bc3bbd700ad..0154c890b6665b77cec8607dce9d344db592e02b 100644 (file)
@@ -486,7 +486,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void updateConfigParams(ConfigParams configParams) {
+
+        // obtain the RaftPolicy for oldConfigParams and the updated one.
+        String oldRaftPolicy = context.getConfigParams().
+            getCustomRaftPolicyImplementationClass();
+        String newRaftPolicy = configParams.
+            getCustomRaftPolicyImplementationClass();
+
+        LOG.debug ("RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}",
+            oldRaftPolicy, newRaftPolicy);
         context.setConfigParams(configParams);
+        if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
+            //RaftPolicy is modifed for the Actor. Re-initialize its current behaviour
+            initializeBehavior();
+        }
     }
 
     public final DataPersistenceProvider persistence() {
@@ -9,10 +9,11 @@
 package org.opendaylight.controller.cluster.raft.policy;
 
 /**
- * DisableElectionsRaftPolicy can be useful for testing purposes where we may want to disable
- * elections so that the Leaders for a RaftActor can be set externally. Modification to state would
+ * DisableElectionsRaftPolicy can be used for actors that does not
+ * participate in shard election. Modification to state would
  * still require consensus.
  */
+
 public class DisableElectionsRaftPolicy implements RaftPolicy {
     @Override
     public boolean automaticElectionsEnabled() {
index ed4ea72c16c65b57522e8b4f7175b49fc89defa1..51d6478fd6a0a6496b223e95dda79ed442083542 100644 (file)
@@ -10,7 +10,9 @@ package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
@@ -1003,4 +1005,52 @@ public class RaftActorTest extends AbstractActorTest {
         }
     }
 
+    @Test
+    public void testUpdateConfigParam() throws Exception {
+        DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
+        String persistenceId = factory.generateActorId("follower-");
+        ImmutableMap<String, String> peerAddresses =
+            ImmutableMap.<String, String>builder().put("member1", "address").build();
+        DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+        TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
+                MockRaftActor.props(persistenceId, peerAddresses,
+                    Optional.<ConfigParams>of(emptyConfig), dataPersistenceProvider), persistenceId);
+        MockRaftActor mockRaftActor = actorRef.underlyingActor();
+        mockRaftActor.waitForInitializeBehaviorComplete();
+
+        RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
+        mockRaftActor.updateConfigParams(emptyConfig);
+        assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+
+        DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
+        disableConfig.setCustomRaftPolicyImplementationClass(
+            "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
+        mockRaftActor.updateConfigParams(disableConfig);
+        assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+
+        behavior = mockRaftActor.getCurrentBehavior();
+        mockRaftActor.updateConfigParams(disableConfig);
+        assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+
+        DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
+        defaultConfig.setCustomRaftPolicyImplementationClass(
+            "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
+        mockRaftActor.updateConfigParams(defaultConfig);
+        assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+
+        behavior = mockRaftActor.getCurrentBehavior();
+        mockRaftActor.updateConfigParams(defaultConfig);
+        assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+    }
 }
index 2d793157744c221454c8a542c65ed926244efb0f..a878f6decbf0cbe51a8c34176fc2a58e522d74c0 100644 (file)
@@ -8,18 +8,22 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
 import akka.serialization.Serialization;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
@@ -71,10 +75,15 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -746,10 +755,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
+    private DatastoreContext getInitShardDataStoreContext() {
+        return (DatastoreContext.newBuilderFrom(datastoreContext)
+                .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
+                .build());
+    }
+
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
-        String shardName = shardReplicaMsg.getShardName();
+        final String shardName = shardReplicaMsg.getShardName();
 
         // verify the local shard replica is already available in the controller node
+        LOG.debug ("received AddShardReplica for shard {}", shardName);
         if (localShards.containsKey(shardName)) {
             LOG.debug ("Local shard {} already available in the controller node", shardName);
             getSender().tell(new akka.actor.Status.Failure(
@@ -767,10 +783,143 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         // Create the localShard
-        getSender().tell(new akka.actor.Status.Success(true), getSelf());
+        if (schemaContext == null) {
+            LOG.debug ("schemaContext is not updated to create localShardActor");
+            getSender().tell(new akka.actor.Status.Failure(
+               new IllegalStateException(String.format(
+                  "schemaContext not available to create localShardActor for %s",
+                   shardName))), getSelf());
+            return;
+        }
+
+        Map<String, String> peerAddresses = getPeerAddresses(shardName);
+        if (peerAddresses.isEmpty()) {
+            LOG.debug ("Shard peers not available for replicating shard data from leader");
+            getSender().tell(new akka.actor.Status.Failure(
+                new IllegalStateException(String.format(
+                    "Cannot add replica for shard %s because no peer is available",
+                    shardName))), getSelf());
+            return;
+        }
+
+        Timeout findPrimaryTimeout = new Timeout(datastoreContext
+                       .getShardInitializationTimeout().duration().$times(2));
+
+        final ActorRef sender = getSender();
+        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true),
+                                       findPrimaryTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    LOG.debug ("Failed to receive response for FindPrimary of shard {}",
+                                shardName, failure);
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("Failed to find leader for shard %s", shardName), failure)),
+                        getSelf());
+                } else {
+                    if (!(response instanceof RemotePrimaryShardFound)) {
+                        LOG.debug ("Shard leader not available for creating local shard replica {}",
+                            shardName);
+                        sender.tell(new akka.actor.Status.Failure(
+                            new IllegalStateException(String.format(
+                                "Invalid response type, %s, received from FindPrimary for shard %s",
+                                response.getClass().getName(), shardName))), getSelf());
+                        return;
+                    }
+                    RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+                    addShard (shardName, message, sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void addShard(final String shardName, final RemotePrimaryShardFound response,
+                          final ActorRef sender) {
+        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                                                     shardName);
+        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName,
+            cluster.getCurrentMemberName());
+        final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
+                          getPeerAddresses(shardName), getInitShardDataStoreContext(),
+                          new DefaultShardPropsCreator(), peerAddressResolver);
+        localShards.put(shardName, shardInfo);
+        shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+
+        //inform ShardLeader to add this shard as a replica by sending an AddServer message
+        LOG.debug ("sending AddServer message to peer {} for shard {}",
+            response.getPrimaryPath(), shardId);
+
+        Timeout addServerTimeout = new Timeout(datastoreContext
+                       .getShardLeaderElectionTimeout().duration().$times(4));
+        Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+            new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object addServerResponse) {
+                if (failure != null) {
+                    LOG.debug ("AddServer request to {} for {} failed",
+                        response.getPrimaryPath(), shardName, failure);
+                    // Remove the shard
+                    localShards.remove(shardName);
+                    if (shardInfo.getActor() != null) {
+                        shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+                    }
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName), failure)), getSelf());
+                } else {
+                    AddServerReply reply = (AddServerReply)addServerResponse;
+                    onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).
+            getDispatcher(Dispatchers.DispatcherType.Client));
         return;
     }
 
+    private void onAddServerReply (String shardName, ShardInformation shardInfo,
+                                   AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("Leader shard successfully added the replica shard {}",
+                        shardName);
+            // Make the local shard voting capable
+            shardInfo.setDatastoreContext(datastoreContext, getSelf());
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                                                         shardName);
+            mBean.addLocalShard(shardId.toString());
+            sender.tell(new akka.actor.Status.Success(true), getSelf());
+        } else {
+            LOG.warn ("Cannot add shard replica {} status {}",
+                      shardName, replyMsg.getStatus());
+            LOG.debug ("removing the local shard replica for shard {}",
+                       shardName);
+            //remove the local replica created
+            localShards.remove(shardName);
+            if (shardInfo.getActor() != null) {
+                shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+            }
+            switch (replyMsg.getStatus()) {
+                //case ServerChangeStatus.TIMEOUT:
+                case TIMEOUT:
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                            leaderPath, shardName))), getSelf());
+                    break;
+                //case ServerChangeStatus.NO_LEADER:
+                case NO_LEADER:
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+                        "There is no shard leader available for shard %s", shardName))), getSelf());
+                    break;
+                default :
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+                        "AddServer request to leader %s for shard %s failed with status %s",
+                        leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+            }
+        }
+    }
+
     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
         String shardName = shardReplicaMsg.getShardName();
         boolean deleteStatus = false;
@@ -808,7 +957,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private String leaderId;
         private short leaderVersion;
 
-        private final DatastoreContext datastoreContext;
+        private DatastoreContext datastoreContext;
         private final ShardPropsCreator shardPropsCreator;
         private final ShardPeerAddressResolver addressResolver;
 
@@ -993,6 +1142,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
+
+        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+            this.datastoreContext = datastoreContext;
+            //notify the datastoreContextchange
+            LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
+                 this.shardName);
+            if (actor != null) {
+                actor.tell(this.datastoreContext, sender);
+            }
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
index 77250896d910d50fd0f150b089dfd2c8c69f8428..4a6754bbb3b2cb665bc328d0938007365ced577c 100644 (file)
@@ -83,6 +83,9 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -1060,16 +1063,96 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testAddShardReplica() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
-                put("default", Arrays.asList("member-1", "member-2")).
-                put("astronauts", Arrays.asList("member-2")).build());
+        MockConfiguration mockConfig =
+                new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                   put("default", Arrays.asList("member-1", "member-2")).
+                   put("astronauts", Arrays.asList("member-2")).build());
 
-            ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig));
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
-            shardManager.tell(new AddShardReplica("astronauts"), getRef());
-            expectMsgClass(duration("2 seconds"), Status.Success.class);
-         }};
+        // Create an ActorSystem ShardManager actor for member-1.
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+        ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+        final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
+                newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
+                   new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+        final ActorSystem system2 = ActorSystem.create("cluster-test",
+            ConfigFactory.load().getConfig("Member2"));
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
+        final TestActorRef<MockRespondActor> mockShardLeaderActor =
+            TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+        final TestActorRef<ForwardingShardManager> leaderShardManager = TestActorRef.create(system2,
+                newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor,
+                        new ClusterWrapperImpl(system2), mockConfig), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
+
+            String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+            short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
+            leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
+                    Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
+            leaderShardManager.tell(new RoleChangeNotification(memberId2,
+                    RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
+
+            newReplicaShardManager.underlyingActor().waitForMemberUp();
+            leaderShardManager.underlyingActor().waitForMemberUp();
+
+            //construct a mock response message
+            AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
+            mockShardLeaderActor.underlyingActor().updateResponse(response);
+            newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
+            AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
+                AddServer.class);
+            String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
+            assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
+
+            expectMsgClass(duration("5 seconds"), Status.Success.class);
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+    @Test
+    public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
+        MockConfiguration mockConfig =
+                new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                   put("default", Arrays.asList("member-1", "member-2")).
+                   put("astronauts", Arrays.asList("member-2")).build());
+
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+        ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+        final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
+                newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
+                   new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
+            newReplicaShardManager.underlyingActor().waitForMemberUp();
+
+            newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
+            Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
+            assertEquals("Failure obtained", true,
+                          (resp.cause() instanceof RuntimeException));
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
     }
 
     @Test
@@ -1248,4 +1331,24 @@ public class ShardManagerTest extends AbstractActorTest {
             findPrimaryMessageReceived = new CountDownLatch(1);
         }
     }
+
+    private static class MockRespondActor extends MessageCollectorActor {
+
+        private Object responseMsg;
+
+        public void updateResponse(Object response) {
+            responseMsg = response;
+        }
+
+        @Override
+        public void onReceive(Object message) throws Exception {
+            super.onReceive(message);
+            if (message instanceof AddServer) {
+                if (responseMsg != null) {
+                    getSender().tell(responseMsg, getSelf());
+                    responseMsg = null;
+                }
+            }
+        }
+    }
 }