Send Shutdown message to Shard on ServerRemoved 38/32238/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 6 Jan 2016 13:24:33 +0000 (08:24 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 11 Jan 2016 19:10:55 +0000 (14:10 -0500)
Modified the ShardManager to send the Shutdown message to the Shard on
ServerRemoved. If the shard was the leader, this will trigger leadership
transfer.

I also made changes to propagate the appropriate error to the caller on
RemoveServerReply instead of always replying with success.

Added a test case in ClusterAdminRpcServiceTest for removing the leader.

Change-Id: I30d2a22f07c1003fad2aba68e4f2d1d2c9fe7eb3
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java

index f5362b5..4677ea9 100644 (file)
@@ -226,7 +226,11 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     @Override
     public void removePeer(String name) {
-        peerInfoMap.remove(name);
+        if(getId().equals(name)) {
+            votingMember = false;
+        } else {
+            peerInfoMap.remove(name);
+        }
     }
 
     @Override public ActorSelection getPeerActorSelection(String peerId) {
index c39c800..b6833f3 100644 (file)
@@ -303,13 +303,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onWrappedShardResponse(WrappedShardResponse message) {
         if (message.getResponse() instanceof RemoveServerReply) {
-            onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
+            onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
+                    message.getLeaderPath());
         }
     }
 
-    private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) {
-        shardReplicaOperationsInProgress.remove(shardName);
-        originalSender.tell(new Status.Success(null), self());
+    private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
+            String leaderPath) {
+        shardReplicaOperationsInProgress.remove(shardId);
+
+        LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+                    shardId.getShardName());
+            originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+        } else {
+            LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+                    persistenceId(), shardId, replyMsg.getStatus());
+
+            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
+                    leaderPath, shardId);
+            originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+        }
     }
 
     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
@@ -356,7 +372,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
                 } else {
                     // SUCCESS
-                    self().tell(new WrappedShardResponse(shardName, response), sender);
+                    self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
@@ -369,8 +385,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
             return;
         } else if(shardInformation.getActor() != null) {
-            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
-            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(new Shutdown(), self());
         }
         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
         persistShardList();
@@ -1834,21 +1850,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * The WrappedShardResponse class wraps a response from a Shard.
      */
     private static class WrappedShardResponse {
-        private final String shardName;
+        private final ShardIdentifier shardId;
         private final Object response;
+        private final String leaderPath;
 
-        private WrappedShardResponse(String shardName, Object response) {
-            this.shardName = shardName;
+        private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+            this.shardId = shardId;
             this.response = response;
+            this.leaderPath = leaderPath;
         }
 
-        String getShardName() {
-            return shardName;
+        ShardIdentifier getShardId() {
+            return shardId;
         }
 
         Object getResponse() {
             return response;
         }
+
+        String getLeaderPath() {
+            return leaderPath;
+        }
     }
 }
 
index 395ac22..4e45dc4 100644 (file)
@@ -25,7 +25,6 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.actor.Status.Success;
-import akka.actor.Terminated;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.Dispatchers;
@@ -1855,8 +1854,6 @@ public class ShardManagerTest extends AbstractActorTest {
             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
                     newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
 
-            watch(shard);
-
             shardManager.underlyingActor().waitForRecoveryComplete();
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@@ -1870,7 +1867,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
 
-            expectMsgClass(duration("5 seconds"), Terminated.class);
+            MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
         }};
 
         LOG.info("testServerRemovedShardActorRunning ending");
index 80aafa2..6e16dcf 100644 (file)
@@ -7,9 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore.admin;
 
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
@@ -355,6 +358,53 @@ public class ClusterAdminRpcServiceTest {
         verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
     }
 
+    @Test
+    public void testRemoveShardLeaderReplica() throws Exception {
+        String name = "testRemoveShardLeaderReplica";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).
+                datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+        replicaNode2.waitForMembersUp("member-1", "member-3");
+        replicaNode2.waitForMembersUp("member-1", "member-2");
+
+        // Invoke RPC service on leader member-1 to remove it's local shard
+
+        ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+                leaderNode1.operDataStore());
+
+        RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
+                setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
+                        get(10, TimeUnit.SECONDS);
+        verifySuccessfulRpcResult(rpcResult);
+        service1.close();
+
+        verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
+                        containsString("member-3")));
+            }
+        });
+
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
+        verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
+    }
+
     @Test
     public void testAddReplicasForAllShards() throws Exception {
         String name = "testAddReplicasForAllShards";