From 186c5d82335ed7d8c39472355f7b1c1e084c26cd Mon Sep 17 00:00:00 2001 From: kalaiselvik Date: Wed, 28 Oct 2015 18:01:57 +0530 Subject: [PATCH] BUG 2187 - Creating ShardReplica Creating local shard replica with a custom Raftpolicy. Informs Shard leader of the local shard. Processes AddServerReply from shard leader. On successful replication, makes local shard voting capable. On replication failure, local shard is removed. Incorporated the comments Change-Id: Id2b90039c39211b20322bc2d141520723d44c391 Signed-off-by: kalaiselvik --- .../controller/cluster/raft/ConfigParams.java | 6 + .../cluster/raft/DefaultConfigParamsImpl.java | 1 + .../controller/cluster/raft/RaftActor.java | 13 ++ .../policy/DisableElectionsRaftPolicy.java | 5 +- .../cluster/raft/RaftActorTest.java | 50 ++++++ .../cluster/datastore/ShardManager.java | 165 +++++++++++++++++- .../cluster/datastore/ShardManagerTest.java | 119 ++++++++++++- 7 files changed, 346 insertions(+), 13 deletions(-) rename opendaylight/md-sal/sal-akka-raft/src/{test => main}/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java (77%) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java index 3645f615e6..c63deae717 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -98,4 +98,10 @@ public interface ConfigParams { * Returns the PeerAddressResolver. */ @Nonnull PeerAddressResolver getPeerAddressResolver(); + + /** + * @return the RaftPolicy class used by this configuration + */ + String getCustomRaftPolicyImplementationClass(); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index c64c295cea..46949da17e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -103,6 +103,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass; } + @Override public String getCustomRaftPolicyImplementationClass() { return customRaftPolicyImplementationClass; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index efcf27da20..0154c890b6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -486,7 +486,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void updateConfigParams(ConfigParams configParams) { + + // obtain the RaftPolicy for oldConfigParams and the updated one. + String oldRaftPolicy = context.getConfigParams(). + getCustomRaftPolicyImplementationClass(); + String newRaftPolicy = configParams. + getCustomRaftPolicyImplementationClass(); + + LOG.debug ("RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", + oldRaftPolicy, newRaftPolicy); context.setConfigParams(configParams); + if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) { + //RaftPolicy is modifed for the Actor. Re-initialize its current behaviour + initializeBehavior(); + } } public final DataPersistenceProvider persistence() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java similarity index 77% rename from opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java rename to opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java index c9d965394a..c6b1e05254 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java @@ -9,10 +9,11 @@ package org.opendaylight.controller.cluster.raft.policy; /** - * DisableElectionsRaftPolicy can be useful for testing purposes where we may want to disable - * elections so that the Leaders for a RaftActor can be set externally. Modification to state would + * DisableElectionsRaftPolicy can be used for actors that does not + * participate in shard election. Modification to state would * still require consensus. */ + public class DisableElectionsRaftPolicy implements RaftPolicy { @Override public boolean automaticElectionsEnabled() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index ed4ea72c16..51d6478fd6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -10,7 +10,9 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; @@ -1003,4 +1005,52 @@ public class RaftActorTest extends AbstractActorTest { } } + @Test + public void testUpdateConfigParam() throws Exception { + DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl(); + String persistenceId = factory.generateActorId("follower-"); + ImmutableMap peerAddresses = + ImmutableMap.builder().put("member1", "address").build(); + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + TestActorRef actorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(emptyConfig), dataPersistenceProvider), persistenceId); + MockRaftActor mockRaftActor = actorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + + RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior(); + mockRaftActor.updateConfigParams(emptyConfig); + assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior()); + assertEquals("Behavior State", RaftState.Follower, + mockRaftActor.getCurrentBehavior().state()); + + DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl(); + disableConfig.setCustomRaftPolicyImplementationClass( + "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy"); + mockRaftActor.updateConfigParams(disableConfig); + assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior()); + assertEquals("Behavior State", RaftState.Follower, + mockRaftActor.getCurrentBehavior().state()); + + behavior = mockRaftActor.getCurrentBehavior(); + mockRaftActor.updateConfigParams(disableConfig); + assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior()); + assertEquals("Behavior State", RaftState.Follower, + mockRaftActor.getCurrentBehavior().state()); + + DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl(); + defaultConfig.setCustomRaftPolicyImplementationClass( + "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy"); + mockRaftActor.updateConfigParams(defaultConfig); + assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior()); + assertEquals("Behavior State", RaftState.Follower, + mockRaftActor.getCurrentBehavior().state()); + + behavior = mockRaftActor.getCurrentBehavior(); + mockRaftActor.updateConfigParams(defaultConfig); + assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior()); + assertEquals("Behavior State", RaftState.Follower, + mockRaftActor.getCurrentBehavior().state()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 2d79315774..a878f6decb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -8,18 +8,22 @@ package org.opendaylight.controller.cluster.datastore; +import static akka.pattern.Patterns.ask; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; +import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Function; import akka.persistence.RecoveryCompleted; import akka.serialization.Serialization; +import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Optional; @@ -71,10 +75,15 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -746,10 +755,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } + private DatastoreContext getInitShardDataStoreContext() { + return (DatastoreContext.newBuilderFrom(datastoreContext) + .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) + .build()); + } + private void onAddShardReplica (AddShardReplica shardReplicaMsg) { - String shardName = shardReplicaMsg.getShardName(); + final String shardName = shardReplicaMsg.getShardName(); // verify the local shard replica is already available in the controller node + LOG.debug ("received AddShardReplica for shard {}", shardName); if (localShards.containsKey(shardName)) { LOG.debug ("Local shard {} already available in the controller node", shardName); getSender().tell(new akka.actor.Status.Failure( @@ -767,10 +783,143 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } // Create the localShard - getSender().tell(new akka.actor.Status.Success(true), getSelf()); + if (schemaContext == null) { + LOG.debug ("schemaContext is not updated to create localShardActor"); + getSender().tell(new akka.actor.Status.Failure( + new IllegalStateException(String.format( + "schemaContext not available to create localShardActor for %s", + shardName))), getSelf()); + return; + } + + Map peerAddresses = getPeerAddresses(shardName); + if (peerAddresses.isEmpty()) { + LOG.debug ("Shard peers not available for replicating shard data from leader"); + getSender().tell(new akka.actor.Status.Failure( + new IllegalStateException(String.format( + "Cannot add replica for shard %s because no peer is available", + shardName))), getSelf()); + return; + } + + Timeout findPrimaryTimeout = new Timeout(datastoreContext + .getShardInitializationTimeout().duration().$times(2)); + + final ActorRef sender = getSender(); + Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), + findPrimaryTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("Failed to receive response for FindPrimary of shard {}", + shardName, failure); + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("Failed to find leader for shard %s", shardName), failure)), + getSelf()); + } else { + if (!(response instanceof RemotePrimaryShardFound)) { + LOG.debug ("Shard leader not available for creating local shard replica {}", + shardName); + sender.tell(new akka.actor.Status.Failure( + new IllegalStateException(String.format( + "Invalid response type, %s, received from FindPrimary for shard %s", + response.getClass().getName(), shardName))), getSelf()); + return; + } + RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; + addShard (shardName, message, sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void addShard(final String shardName, final RemotePrimaryShardFound response, + final ActorRef sender) { + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + shardName); + String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, + cluster.getCurrentMemberName()); + final ShardInformation shardInfo = new ShardInformation(shardName, shardId, + getPeerAddresses(shardName), getInitShardDataStoreContext(), + new DefaultShardPropsCreator(), peerAddressResolver); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + + //inform ShardLeader to add this shard as a replica by sending an AddServer message + LOG.debug ("sending AddServer message to peer {} for shard {}", + response.getPrimaryPath(), shardId); + + Timeout addServerTimeout = new Timeout(datastoreContext + .getShardLeaderElectionTimeout().duration().$times(4)); + Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object addServerResponse) { + if (failure != null) { + LOG.debug ("AddServer request to {} for {} failed", + response.getPrimaryPath(), shardName, failure); + // Remove the shard + localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("AddServer request to leader %s for shard %s failed", + response.getPrimaryPath(), shardName), failure)), getSelf()); + } else { + AddServerReply reply = (AddServerReply)addServerResponse; + onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath()); + } + } + }, new Dispatchers(context().system().dispatchers()). + getDispatcher(Dispatchers.DispatcherType.Client)); return; } + private void onAddServerReply (String shardName, ShardInformation shardInfo, + AddServerReply replyMsg, ActorRef sender, String leaderPath) { + if (replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("Leader shard successfully added the replica shard {}", + shardName); + // Make the local shard voting capable + shardInfo.setDatastoreContext(datastoreContext, getSelf()); + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + shardName); + mBean.addLocalShard(shardId.toString()); + sender.tell(new akka.actor.Status.Success(true), getSelf()); + } else { + LOG.warn ("Cannot add shard replica {} status {}", + shardName, replyMsg.getStatus()); + LOG.debug ("removing the local shard replica for shard {}", + shardName); + //remove the local replica created + localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + switch (replyMsg.getStatus()) { + //case ServerChangeStatus.TIMEOUT: + case TIMEOUT: + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardName))), getSelf()); + break; + //case ServerChangeStatus.NO_LEADER: + case NO_LEADER: + sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( + "There is no shard leader available for shard %s", shardName))), getSelf()); + break; + default : + sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( + "AddServer request to leader %s for shard %s failed with status %s", + leaderPath, shardName, replyMsg.getStatus()))), getSelf()); + } + } + } + private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) { String shardName = shardReplicaMsg.getShardName(); boolean deleteStatus = false; @@ -808,7 +957,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private String leaderId; private short leaderVersion; - private final DatastoreContext datastoreContext; + private DatastoreContext datastoreContext; private final ShardPropsCreator shardPropsCreator; private final ShardPeerAddressResolver addressResolver; @@ -993,6 +1142,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderVersion(short leaderVersion) { this.leaderVersion = leaderVersion; } + + void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { + this.datastoreContext = datastoreContext; + //notify the datastoreContextchange + LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}", + this.shardName); + if (actor != null) { + actor.tell(this.datastoreContext, sender); + } + } } private static class ShardManagerCreator implements Creator { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 77250896d9..4a6754bbb3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -83,6 +83,9 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -1060,16 +1063,96 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplica() throws Exception { - new JavaTestKit(getSystem()) {{ - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). - put("default", Arrays.asList("member-1", "member-2")). - put("astronauts", Arrays.asList("member-2")).build()); + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); - ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig)); + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - shardManager.tell(new AddShardReplica("astronauts"), getRef()); - expectMsgClass(duration("2 seconds"), Status.Success.class); - }}; + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, + new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + final ActorSystem system2 = ActorSystem.create("cluster-test", + ConfigFactory.load().getConfig("Member2")); + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + String name = new ShardIdentifier("astronauts", "member-2", "config").toString(); + final TestActorRef mockShardLeaderActor = + TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + final TestActorRef leaderShardManager = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor, + new ClusterWrapperImpl(system2), mockConfig), shardManagerID); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor); + leaderShardManager.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor); + + newReplicaShardManager.underlyingActor().waitForMemberUp(); + leaderShardManager.underlyingActor().waitForMemberUp(); + + //construct a mock response message + AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2); + mockShardLeaderActor.underlyingActor().updateResponse(response); + newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); + AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, + AddServer.class); + String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; + assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); + + expectMsgClass(duration("5 seconds"), Status.Success.class); + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + + @Test + public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, + new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString()); + newReplicaShardManager.underlyingActor().waitForMemberUp(); + + newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); + Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); + assertEquals("Failure obtained", true, + (resp.cause() instanceof RuntimeException)); + }}; + + JavaTestKit.shutdownActorSystem(system1); } @Test @@ -1248,4 +1331,24 @@ public class ShardManagerTest extends AbstractActorTest { findPrimaryMessageReceived = new CountDownLatch(1); } } + + private static class MockRespondActor extends MessageCollectorActor { + + private Object responseMsg; + + public void updateResponse(Object response) { + responseMsg = response; + } + + @Override + public void onReceive(Object message) throws Exception { + super.onReceive(message); + if (message instanceof AddServer) { + if (responseMsg != null) { + getSender().tell(responseMsg, getSelf()); + responseMsg = null; + } + } + } + } } -- 2.36.6