From cabb96a79c2b155092e65fb25d271fff85c3f786 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 6 Jan 2016 08:15:26 -0500 Subject: [PATCH] Implement RemoveServer for leader Implemented RemoveServer for leader which previously was coded to fail with the NOT_SUPPORTED error until leadership transfer was implemented. Leadership transfer will be triggered via the Shutdown message in the ShardManager via ServerRemoved message. This wil be done in a subsequent patch. Change-Id: Iae7895a3801986e482073ccf8ea24e5b720b7618 Signed-off-by: Tom Pantelis --- .../cluster/raft/RaftActorContext.java | 2 +- .../cluster/raft/RaftActorContextImpl.java | 10 ++- .../RaftActorServerConfigurationSupport.java | 19 ++--- .../raft/RaftActorSnapshotMessageSupport.java | 4 +- .../cluster/raft/SnapshotManager.java | 2 +- .../cluster/raft/behaviors/Follower.java | 2 +- ...ftActorServerConfigurationSupportTest.java | 69 ++++++++++++------- 7 files changed, 66 insertions(+), 42 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 10db6d06c2..6f941d7dbe 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -235,7 +235,7 @@ public interface RaftActorContext { * @return the RaftActor's peer information as a ServerConfigurationPayload if the * dynamic server configurations are available, otherwise returns null */ - @Nullable ServerConfigurationPayload getPeerServerInfo(); + @Nullable ServerConfigurationPayload getPeerServerInfo(boolean includeSelf); /** * @return true if this RaftActor is a voting member of the cluster, false otherwise. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index d9cfbcdd11..f5362b56b4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -224,7 +224,8 @@ public class RaftActorContextImpl implements RaftActorContext { peerInfoMap.put(id, new PeerInfo(id, address, votingState)); } - @Override public void removePeer(String name) { + @Override + public void removePeer(String name) { peerInfoMap.remove(name); } @@ -290,7 +291,7 @@ public class RaftActorContextImpl implements RaftActorContext { } @Override - public ServerConfigurationPayload getPeerServerInfo() { + public ServerConfigurationPayload getPeerServerInfo(boolean includeSelf) { if (!isDynamicServerConfigurationInUse()) { return null; } @@ -300,7 +301,10 @@ public class RaftActorContextImpl implements RaftActorContext { newConfig.add(new ServerInfo(peer.getId(), peer.isVoting())); } - newConfig.add(new ServerInfo(getId(), true)); + if(includeSelf) { + newConfig.add(new ServerInfo(getId(), votingMember)); + } + return (new ServerConfigurationPayload(newConfig)); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index bff21337de..3db0316dd1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -77,15 +77,14 @@ class RaftActorServerConfigurationSupport { private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); - if(removeServer.getServerId().equals(raftActor.getLeaderId())){ - // Removing current leader is not supported yet - // TODO: To properly support current leader removal we need to first implement transfer of leadership - LOG.debug("Cannot remove {} replica because it is the Leader", removeServer.getServerId()); - sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf()); - } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) { - sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf()); + boolean isSelf = removeServer.getServerId().equals(raftActor.getId()); + if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) { + sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), + raftActor.getSelf()); } else { - onNewOperation(new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender)); + String serverAddress = isSelf ? raftActor.self().path().toString() : + raftContext.getPeerAddress(removeServer.getServerId()); + onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender)); } } @@ -199,7 +198,9 @@ class RaftActorServerConfigurationSupport { protected void persistNewServerConfiguration(ServerOperationContext operationContext){ raftContext.setDynamicServerConfigurationInUse(); - ServerConfigurationPayload payload = raftContext.getPeerServerInfo(); + + boolean includeSelf = !operationContext.getServerId().equals(raftActor.getId()); + ServerConfigurationPayload payload = raftContext.getPeerServerInfo(includeSelf); LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig()); raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 39548dc6d5..56f40df758 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -125,13 +125,13 @@ class RaftActorSnapshotMessageSupport { ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot, ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, - snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo())); + snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true))); cohort.createSnapshot(snapshotReplyActor); } else { Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1, context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), - context.getPeerServerInfo()); + context.getPeerServerInfo(true)); sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)), context.getActor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 856c8f5ba8..cffd422222 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -303,7 +303,7 @@ public class SnapshotManager implements SnapshotState { captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(), context.getTermInformation().getCurrentTerm(), - context.getTermInformation().getVotedFor(), context.getPeerServerInfo()); + context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true)); context.getPersistenceProvider().saveSnapshot(snapshot); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 57b1d92c72..ec1642ec2a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -379,7 +379,7 @@ public class Follower extends AbstractRaftActorBehavior { installSnapshot.getLastIncludedTerm(), context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), - context.getPeerServerInfo()); + context.getPeerServerInfo(true)); ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 0e29a3a031..3420c4f083 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -722,45 +722,26 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus()); } - @Test - public void testRemoveServerSelf() { - RaftActorContext initialActorContext = new MockRaftActorContext(); - - TestActorRef leaderActor = actorFactory.createTestActor( - MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), - initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), - actorFactory.generateActorId(LEADER_ID)); - - leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef()); - RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class); - assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus()); - } - @Test public void testRemoveServerForwardToLeader() { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); - RaftActorContext initialActorContext = new MockRaftActorContext(); - - TestActorRef leaderActor = actorFactory.createTestActor( - MockLeaderRaftActor.props(ImmutableMap.of(), - initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + TestActorRef leaderActor = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); TestActorRef followerRaftActor = actorFactory.createTestActor( MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(FOLLOWER_ID)); - + followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete(); followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.emptyList(), - -1, -1, (short) 0), leaderActor); + -1, -1, (short)0), leaderActor); - followerRaftActor.tell(new RemoveServer(LEADER_ID), testKit.getRef()); - RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class); - assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus()); + followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef()); + expectFirstMatching(leaderActor, RemoveServer.class); } @Test @@ -802,6 +783,44 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class); } + @Test + public void testRemoveServerLeader() { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID); + final String followerActorPath = actorFactory.createTestActorPath(followerActorId); + RaftActorContext initialActorContext = new MockRaftActorContext(); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + TestActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor()); + + TestActorRef followerRaftActor = actorFactory.createTestActor( + CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), + configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + followerActorId); + + TestActorRef followerCollector = actorFactory.createTestActor(MessageCollectorActor.props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector")); + followerRaftActor.underlyingActor().setCollectorActor(followerCollector); + + leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef()); + RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus()); + + final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class); + assertEquals(0L, applyState.getReplicatedLogEntry().getIndex()); + verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + votingServer(FOLLOWER_ID)); + + MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class); + } + private ServerInfo votingServer(String id) { return new ServerInfo(id, true); } -- 2.36.6