From: Moiz Raja Date: Fri, 20 Nov 2015 01:35:42 +0000 (-0800) Subject: BUG 2187 Handle RemoveShardReplica in ShardManager X-Git-Tag: release/beryllium~77 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e65e1755bd770cf03d3ea15edda9b9cc7a79f3c0 BUG 2187 Handle RemoveShardReplica in ShardManager Change-Id: Iaaabe6ab351d8f0964d32fa9e7e71416a2a10209 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 85f40e3a07..3e14c5837d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -16,6 +16,7 @@ import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; import akka.dispatch.OnComplete; @@ -90,6 +91,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.RemoveServer; +import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -232,8 +235,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof PrimaryShardFoundForContext) { PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; onPrimaryShardFoundContext(primaryShardFoundContext); - } else if(message instanceof RemoveShardReplica){ - onRemoveShardReplica((RemoveShardReplica)message); + } else if(message instanceof RemoveShardReplica) { + onRemoveShardReplica((RemoveShardReplica) message); + } else if(message instanceof WrappedShardResponse){ + onWrappedShardResponse((WrappedShardResponse) message); } else if(message instanceof GetSnapshot) { onGetSnapshot(); } else if(message instanceof ServerRemoved){ @@ -248,12 +253,67 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onWrappedShardResponse(WrappedShardResponse message) { + if (message.getResponse() instanceof RemoveServerReply) { + onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse()); + } + } + + private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) { + shardReplicaOperationsInProgress.remove(shardName); + originalSender.tell(new Status.Success(null), self()); + } + private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { - addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender()); + addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(), + getSender()); + } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){ + removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(), + primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender()); } } + private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, + final ActorRef sender) { + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); + + final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + + //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message + LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), + primaryPath, shardId); + + Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout(). + duration()); + Future futureObj = ask(getContext().actorSelection(primaryPath), + new RemoveServer(shardId.toString()), removeServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + String msg = String.format("RemoveServer request to leader %s for shard %s failed", + primaryPath, shardName); + + LOG.debug ("{}: {}", persistenceId(), msg, failure); + + // FAILURE + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + // SUCCESS + self().tell(new WrappedShardResponse(shardName, response), sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private void onShardReplicaRemoved(ServerRemoved message) { final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build(); final ShardInformation shardInformation = localShards.remove(shardId.getShardName()); @@ -952,7 +1012,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); - } @Override @@ -1063,40 +1122,50 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.warn ("{}: Leader failed to add shard replica {} with status {}", persistenceId(), shardName, replyMsg.getStatus()); - Exception failure; - switch (replyMsg.getStatus()) { - case TIMEOUT: - failure = new TimeoutException(String.format( - "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + - "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", - leaderPath, shardName)); - break; - case NO_LEADER: - failure = createNoShardLeaderException(shardInfo.getShardId()); - break; - default : - failure = new RuntimeException(String.format( - "AddServer request to leader %s for shard %s failed with status %s", - leaderPath, shardName, replyMsg.getStatus())); - } + Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId()); onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure); } } - private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) { - String shardName = shardReplicaMsg.getShardName(); - - // verify the local shard replica is available in the controller node - if (!localShards.containsKey(shardName)) { - String msg = String.format("Local shard %s does not", shardName); - LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); - return; + private Exception getServerChangeException(Class serverChange, ServerChangeStatus serverChangeStatus, + String leaderPath, ShardIdentifier shardId) { + Exception failure; + switch (serverChangeStatus) { + case TIMEOUT: + failure = new TimeoutException(String.format( + "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + + "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardId.getShardName())); + break; + case NO_LEADER: + failure = createNoShardLeaderException(shardId); + break; + case NOT_SUPPORTED: + failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s", + serverChange.getSimpleName(), shardId.getShardName())); + break; + default : + failure = new RuntimeException(String.format( + "%s request to leader %s for shard %s failed with status %s", + serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus)); } - // call RemoveShard for the shardName - getSender().tell(new akka.actor.Status.Success(true), getSelf()); - return; + return failure; + } + + private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) { + findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(), + shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound response) { + getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + } + }); } private void persistShardList() { @@ -1489,6 +1558,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private CountDownLatch waitTillReadyCountdownLatch; private PrimaryShardInfoFutureCache primaryShardInfoCache; private DatastoreSnapshot restoreFromSnapshot; + private volatile boolean sealed; @SuppressWarnings("unchecked") @@ -1642,10 +1712,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardName; } - public ActorRef getShardManagerActor() { - return shardManagerActor; - } - @Override public void onFailure(Throwable failure) { LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure); @@ -1675,45 +1741,60 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final RemotePrimaryShardFound remotePrimaryShardFound; private final LocalPrimaryShardFound localPrimaryShardFound; - public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) { + public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, + @Nonnull Object primaryFoundMessage) { this.shardName = Preconditions.checkNotNull(shardName); this.contextMessage = Preconditions.checkNotNull(contextMessage); Preconditions.checkNotNull(primaryFoundMessage); - this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null; - this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null; + this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? + (RemotePrimaryShardFound) primaryFoundMessage : null; + this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? + (LocalPrimaryShardFound) primaryFoundMessage : null; } @Nonnull - public String getPrimaryPath(){ - if(remotePrimaryShardFound != null){ + String getPrimaryPath(){ + if(remotePrimaryShardFound != null) { return remotePrimaryShardFound.getPrimaryPath(); } return localPrimaryShardFound.getPrimaryPath(); } @Nonnull - public Object getContextMessage() { + Object getContextMessage() { return contextMessage; } @Nullable - public RemotePrimaryShardFound getRemotePrimaryShardFound(){ + RemotePrimaryShardFound getRemotePrimaryShardFound() { return remotePrimaryShardFound; } - @Nullable - public LocalPrimaryShardFound getLocalPrimaryShardFound(){ - return localPrimaryShardFound; + @Nonnull + String getShardName() { + return shardName; } + } + + /** + * The WrappedShardResponse class wraps a response from a Shard. + */ + private static class WrappedShardResponse { + private final String shardName; + private final Object response; - boolean isPrimaryLocal(){ - return (remotePrimaryShardFound == null); + private WrappedShardResponse(String shardName, Object response) { + this.shardName = shardName; + this.response = response; } - @Nonnull - public String getShardName() { + String getShardName() { return shardName; } + + Object getResponse() { + return response; + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoveShardReplica.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoveShardReplica.java index 86c70234f2..54cec906b6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoveShardReplica.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoveShardReplica.java @@ -8,34 +8,38 @@ package org.opendaylight.controller.cluster.datastore.messages; -import javax.annotation.Nonnull; import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; /** * A message sent to the ShardManager to dynamically remove a local shard * replica available in this node. */ - public class RemoveShardReplica { private final String shardName; + private final String memberName; /** * Constructor. * * @param shardName name of the local shard that is to be dynamically removed. */ - - public RemoveShardReplica (@Nonnull String shardName){ - this.shardName = Preconditions.checkNotNull(shardName, "ShardName should not be null"); + public RemoveShardReplica (@Nonnull String shardName, @Nonnull String memberName) { + this.shardName = Preconditions.checkNotNull(shardName, "shardName should not be null"); + this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null"); } public String getShardName(){ - return this.shardName; + return shardName; + } + + public String getMemberName() { + return memberName; } @Override - public String toString(){ - return "RemoveShardReplica[ShardName=" + shardName + "]"; + public String toString() { + return "RemoveShardReplica [shardName=" + shardName + ", memberName=" + memberName + "]"; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index cef38b9c1b..f9b67567d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -93,6 +93,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -106,6 +107,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.RemoveServer; +import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -210,13 +213,23 @@ public class ShardManagerTest extends AbstractActorTest { } private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() { - return TestShardManager.builder(datastoreContextBuilder).shardActor(mockShardActor); + return newTestShardMgrBuilderWithMockShardActor(mockShardActor); } + private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) { + return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor); + } + + private Props newPropsShardMgrWithMockShardActor() { return newTestShardMgrBuilderWithMockShardActor().props(); } + private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) { + return newTestShardMgrBuilderWithMockShardActor(shardActor).props(); + } + + private TestShardManager newTestShardManager() { return newTestShardManager(newShardMgrProps()); } @@ -1579,31 +1592,8 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplicaWithAlreadyInProgress() throws Exception { - LOG.info("testAddShardReplicaWithAlreadyInProgress starting"); - new JavaTestKit(getSystem()) {{ - JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); - JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); - - MockConfiguration mockConfig = - new MockConfiguration(ImmutableMap.>builder(). - put("astronauts", Arrays.asList("member-2")).build()); - - final TestActorRef shardManager = actorFactory.createTestActor( - newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID); - shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); - - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - - shardManager.tell(new AddShardReplica("astronauts"), getRef()); - - mockShardLeaderKit.expectMsgClass(AddServer.class); - - shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef()); - - secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); - }}; - - LOG.info("testAddShardReplicaWithAlreadyInProgress ending"); + testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"), + AddServer.class, new AddShardReplica("astronauts")); } @Test @@ -1634,12 +1624,171 @@ public class ShardManagerTest extends AbstractActorTest { ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); - shardManager.tell(new RemoveShardReplica("model-inventory"), getRef()); - Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); + shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef()); + Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class); assertEquals("Failure obtained", true, - (resp.cause() instanceof IllegalArgumentException)); + (resp.cause() instanceof PrimaryNotFoundException)); }}; + } + + @Test + /** + * Primary is Local + */ + public void testRemoveShardReplicaLocal() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final TestActorRef respondActor = + TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), getRef()); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), respondActor); + respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null)); + shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef()); + final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class); + assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(), + removeServer.getServerId()); + expectMsgClass(duration("5 seconds"), Success.class); + }}; + } + + @Test + public void testRemoveShardReplicaRemote() throws Exception { + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-1")).build()); + + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = newActorSystem("Member1"); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster( + new ClusterWrapperImpl(system1)).props(), + shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + final ActorSystem system2 = newActorSystem("Member2"); + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString(); + final TestActorRef mockShardLeaderActor = + TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + + LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor); + + final TestActorRef leaderShardManager = TestActorRef.create(system2, + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster( + new ClusterWrapperImpl(system2)).props(), + shardManagerID); + + // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so, + // akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1 + // However when a shard manager has a local shard which is a follower and a leader that is remote it will + // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will + // look like so, + // akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1 + // In this specific case if we did a FindPrimary for shard default from member-1 we would come up + // with the address of an actor which does not exist, therefore any message sent to that actor would go to + // dead letters. + // To work around this problem we create a ForwardingActor with the right address and pass to it the + // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every + // thing works as expected + final ActorRef actorRef = leaderShardManager.underlyingActor().context() + .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix); + + LOG.error("Forwarding actor : {}", actorRef); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor); + leaderShardManager.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor); + + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor); + newReplicaShardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + + newReplicaShardManager.underlyingActor().waitForMemberUp(); + leaderShardManager.underlyingActor().waitForMemberUp(); + + //construct a mock response message + RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2); + mockShardLeaderActor.underlyingActor().updateResponse(response); + newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef()); + RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, + RemoveServer.class); + String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(); + assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId()); + expectMsgClass(duration("5 seconds"), Status.Success.class); + }}; + + } + + @Test + public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception { + testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"), + RemoveServer.class, new RemoveShardReplica("astronauts", "member-3")); + } + + @Test + public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception { + testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"), + AddServer.class, new RemoveShardReplica("astronauts", "member-2")); + } + + + public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange, + final Class firstForwardedServerChangeClass, + final Object secondServerChange) throws Exception { + new JavaTestKit(getSystem()) {{ + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put(shardName, Arrays.asList("member-2")).build()); + + final TestActorRef shardManager = TestActorRef.create(getSystem(), + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster( + new MockClusterWrapper()).props(), + shardMgrID); + + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(firstServerChange, getRef()); + + mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass); + + shardManager.tell(secondServerChange, secondRequestKit.getRef()); + + secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); + }}; } @Test @@ -1962,6 +2111,7 @@ public class ShardManagerTest extends AbstractActorTest { private static class MockRespondActor extends MessageCollectorActor { static final String CLEAR_RESPONSE = "clear-response"; + static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class); private volatile Object responseMsg; @@ -1980,12 +2130,15 @@ public class ShardManagerTest extends AbstractActorTest { @Override public void onReceive(Object message) throws Exception { + if(!"get-all-messages".equals(message)) { + LOG.debug("Received message : {}", message); + } super.onReceive(message); - if (message instanceof AddServer) { - if (responseMsg != null) { - getSender().tell(responseMsg, getSelf()); - } - } if(message.equals(CLEAR_RESPONSE)) { + if (message instanceof AddServer && responseMsg != null) { + getSender().tell(responseMsg, getSelf()); + } else if(message instanceof RemoveServer && responseMsg != null){ + getSender().tell(responseMsg, getSelf()); + } else if(message.equals(CLEAR_RESPONSE)) { responseMsg = null; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ForwardingActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ForwardingActor.java new file mode 100644 index 0000000000..f9c3f3e535 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ForwardingActor.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; + +public class ForwardingActor extends UntypedActor{ + private final ActorRef target; + + private ForwardingActor(ActorRef target) { + this.target = target; + } + + @Override + public void onReceive(Object o) throws Exception { + target.forward(o, context()); + } + +}