Implement RemoveServer for leader 37/32237/2
authorTom Pantelis <tpanteli@brocade.com>
Wed, 6 Jan 2016 13:15:26 +0000 (08:15 -0500)
committerAnil Vishnoi <vishnoianil@gmail.com>
Fri, 8 Jan 2016 18:42:05 +0000 (18:42 +0000)
Implemented RemoveServer for leader which previously was coded to fail
with the NOT_SUPPORTED error until leadership transfer was implemented.
Leadership transfer will be triggered via the Shutdown message in the
ShardManager via ServerRemoved message. This wil be done in a subsequent
patch.

Change-Id: Iae7895a3801986e482073ccf8ea24e5b720b7618
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java

index 10db6d06c21850a1d2e1a619845c4a4b599e4025..6f941d7dbea306a2f0150aebfe27bc32c01c1ca8 100644 (file)
@@ -235,7 +235,7 @@ public interface RaftActorContext {
      * @return the RaftActor's peer information as a ServerConfigurationPayload if the
      * dynamic server configurations are available, otherwise returns null
      */
-    @Nullable ServerConfigurationPayload getPeerServerInfo();
+    @Nullable ServerConfigurationPayload getPeerServerInfo(boolean includeSelf);
 
     /**
      * @return true if this RaftActor is a voting member of the cluster, false otherwise.
index d9cfbcdd11a24db86a11fccf79fb023101a7becc..f5362b56b467930d69177fbb099e281173757463 100644 (file)
@@ -224,7 +224,8 @@ public class RaftActorContextImpl implements RaftActorContext {
         peerInfoMap.put(id, new PeerInfo(id, address, votingState));
     }
 
-    @Override public void removePeer(String name) {
+    @Override
+    public void removePeer(String name) {
         peerInfoMap.remove(name);
     }
 
@@ -290,7 +291,7 @@ public class RaftActorContextImpl implements RaftActorContext {
     }
 
     @Override
-    public ServerConfigurationPayload getPeerServerInfo() {
+    public ServerConfigurationPayload getPeerServerInfo(boolean includeSelf) {
         if (!isDynamicServerConfigurationInUse()) {
             return null;
         }
@@ -300,7 +301,10 @@ public class RaftActorContextImpl implements RaftActorContext {
             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
         }
 
-        newConfig.add(new ServerInfo(getId(), true));
+        if(includeSelf) {
+            newConfig.add(new ServerInfo(getId(), votingMember));
+        }
+
         return (new ServerConfigurationPayload(newConfig));
     }
 
index bff21337de19efd081f7a6e4eb2c4b8ed14a10a9..3db0316dd11b02816975d3b15c971e6553230f2c 100644 (file)
@@ -77,15 +77,14 @@ class RaftActorServerConfigurationSupport {
 
     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
-        if(removeServer.getServerId().equals(raftActor.getLeaderId())){
-            // Removing current leader is not supported yet
-            // TODO: To properly support current leader removal we need to first implement transfer of leadership
-            LOG.debug("Cannot remove {} replica because it is the Leader", removeServer.getServerId());
-            sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf());
-        } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
-            sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
+        boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
+        if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
+            sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
+                    raftActor.getSelf());
         } else {
-            onNewOperation(new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
+            String serverAddress = isSelf ? raftActor.self().path().toString() :
+                raftContext.getPeerAddress(removeServer.getServerId());
+            onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
         }
     }
 
@@ -199,7 +198,9 @@ class RaftActorServerConfigurationSupport {
 
         protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
             raftContext.setDynamicServerConfigurationInUse();
-            ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
+
+            boolean includeSelf = !operationContext.getServerId().equals(raftActor.getId());
+            ServerConfigurationPayload payload = raftContext.getPeerServerInfo(includeSelf);
             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
 
             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
index 39548dc6d5e426dad36fa5fe30eb3b5b448a7bc0..56f40df7584b2bae4fdbcf42ae87a87eda09f527 100644 (file)
@@ -125,13 +125,13 @@ class RaftActorSnapshotMessageSupport {
 
             ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
                     ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
-                    snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo()));
+                    snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true)));
 
             cohort.createSnapshot(snapshotReplyActor);
         } else {
             Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
-                    context.getPeerServerInfo());
+                    context.getPeerServerInfo(true));
 
             sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)),
                     context.getActor());
index 856c8f5ba8fbd1f21f8a6b9cbc33208013db1eb1..cffd4222220041e5bf9a8ccf7cb2ea7ebff996a1 100644 (file)
@@ -303,7 +303,7 @@ public class SnapshotManager implements SnapshotState {
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
                     context.getTermInformation().getCurrentTerm(),
-                    context.getTermInformation().getVotedFor(), context.getPeerServerInfo());
+                    context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true));
 
             context.getPersistenceProvider().saveSnapshot(snapshot);
 
index 57b1d92c726bb558ea95b1ac652f89043827bb46..ec1642ec2aa2329c45e84b32a891bdb3533cb570 100644 (file)
@@ -379,7 +379,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         installSnapshot.getLastIncludedTerm(),
                         context.getTermInformation().getCurrentTerm(),
                         context.getTermInformation().getVotedFor(),
-                        context.getPeerServerInfo());
+                        context.getPeerServerInfo(true));
 
                 ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
                     @Override
index 0e29a3a031e016b218a5c95c11e404aa00cb36f1..3420c4f0831ed3eb1b845f515be6f227bff0e96e 100644 (file)
@@ -722,45 +722,26 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
     }
 
-    @Test
-    public void testRemoveServerSelf() {
-        RaftActorContext initialActorContext = new MockRaftActorContext();
-
-        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
-                MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
-                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
-                actorFactory.generateActorId(LEADER_ID));
-
-        leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
-        RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
-        assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
-    }
-
     @Test
     public void testRemoveServerForwardToLeader() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
 
-        RaftActorContext initialActorContext = new MockRaftActorContext();
-
-        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
-                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
-                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+        TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(LEADER_ID));
 
         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(FOLLOWER_ID));
-
+        followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
 
         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
-                -1, -1, (short) 0), leaderActor);
+                -1, -1, (short)0), leaderActor);
 
-        followerRaftActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
-        RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
-        assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
+        followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
+        expectFirstMatching(leaderActor, RemoveServer.class);
     }
 
     @Test
@@ -802,6 +783,44 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
     }
 
+    @Test
+    public void testRemoveServerLeader() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
+        final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
+
+        TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
+                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                followerActorId);
+
+        TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
+                withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
+        followerRaftActor.underlyingActor().setCollectorActor(followerCollector);
+
+        leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
+        RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
+
+        final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
+        assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
+        verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
+                votingServer(FOLLOWER_ID));
+
+        MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
+    }
+
     private ServerInfo votingServer(String id) {
         return new ServerInfo(id, true);
     }