From: Tom Pantelis Date: Wed, 6 Jan 2016 13:24:33 +0000 (-0500) Subject: Send Shutdown message to Shard on ServerRemoved X-Git-Tag: release/beryllium~33 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ffe27dc93dc6f6a190287164f10444f4f6838d59 Send Shutdown message to Shard on ServerRemoved Modified the ShardManager to send the Shutdown message to the Shard on ServerRemoved. If the shard was the leader, this will trigger leadership transfer. I also made changes to propagate the appropriate error to the caller on RemoveServerReply instead of always replying with success. Added a test case in ClusterAdminRpcServiceTest for removing the leader. Change-Id: I30d2a22f07c1003fad2aba68e4f2d1d2c9fe7eb3 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index f5362b56b4..4677ea946e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -226,7 +226,11 @@ public class RaftActorContextImpl implements RaftActorContext { @Override public void removePeer(String name) { - peerInfoMap.remove(name); + if(getId().equals(name)) { + votingMember = false; + } else { + peerInfoMap.remove(name); + } } @Override public ActorSelection getPeerActorSelection(String peerId) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index c39c80021c..b6833f3004 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -303,13 +303,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onWrappedShardResponse(WrappedShardResponse message) { if (message.getResponse() instanceof RemoveServerReply) { - onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse()); + onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(), + message.getLeaderPath()); } } - private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) { - shardReplicaOperationsInProgress.remove(shardName); - originalSender.tell(new Status.Success(null), self()); + private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, + String leaderPath) { + shardReplicaOperationsInProgress.remove(shardId); + + LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); + + if (replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(), + shardId.getShardName()); + originalSender.tell(new akka.actor.Status.Success(null), getSelf()); + } else { + LOG.warn ("{}: Leader failed to remove shard replica {} with status {}", + persistenceId(), shardId, replyMsg.getStatus()); + + Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), + leaderPath, shardId); + originalSender.tell(new akka.actor.Status.Failure(failure), getSelf()); + } } private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { @@ -356,7 +372,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); } else { // SUCCESS - self().tell(new WrappedShardResponse(shardName, response), sender); + self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -369,8 +385,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString()); return; } else if(shardInformation.getActor() != null) { - LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor()); - shardInformation.getActor().tell(PoisonPill.getInstance(), self()); + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor()); + shardInformation.getActor().tell(new Shutdown(), self()); } LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); persistShardList(); @@ -1834,21 +1850,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * The WrappedShardResponse class wraps a response from a Shard. */ private static class WrappedShardResponse { - private final String shardName; + private final ShardIdentifier shardId; private final Object response; + private final String leaderPath; - private WrappedShardResponse(String shardName, Object response) { - this.shardName = shardName; + private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { + this.shardId = shardId; this.response = response; + this.leaderPath = leaderPath; } - String getShardName() { - return shardName; + ShardIdentifier getShardId() { + return shardId; } Object getResponse() { return response; } + + String getLeaderPath() { + return leaderPath; + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 395ac223e0..4e45dc4f21 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -25,7 +25,6 @@ import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; import akka.actor.Status.Success; -import akka.actor.Terminated; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.dispatch.Dispatchers; @@ -1855,8 +1854,6 @@ public class ShardManagerTest extends AbstractActorTest { TestActorRef shardManager = actorFactory.createTestActor( newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()); - watch(shard); - shardManager.underlyingActor().waitForRecoveryComplete(); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -1870,7 +1867,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); - expectMsgClass(duration("5 seconds"), Terminated.class); + MessageCollectorActor.expectFirstMatching(shard, Shutdown.class); }}; LOG.info("testServerRemovedShardActorRunning ending"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 80aafa26e8..6e16dcfb09 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -7,9 +7,12 @@ */ package org.opendaylight.controller.cluster.datastore.admin; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent; @@ -355,6 +358,53 @@ public class ClusterAdminRpcServiceTest { verifyNoShardPresent(replicaNode2.configDataStore(), "cars"); } + @Test + public void testRemoveShardLeaderReplica() throws Exception { + String name = "testRemoveShardLeaderReplica"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig). + datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); + + replicaNode2.waitForMembersUp("member-1", "member-3"); + replicaNode2.waitForMembersUp("member-1", "member-2"); + + // Invoke RPC service on leader member-1 to remove it's local shard + + ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), + leaderNode1.operDataStore()); + + RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder(). + setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()). + get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + service1.close(); + + verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"), + containsString("member-3"))); + } + }); + + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2"); + verifyNoShardPresent(leaderNode1.configDataStore(), "cars"); + } + @Test public void testAddReplicasForAllShards() throws Exception { String name = "testAddReplicasForAllShards";