From f2b5692224570e7ecccb139594ed55237efeec03 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 12 Jan 2016 18:21:15 -0500 Subject: [PATCH] Implement ChangeServersVotingStatus message in RafActor Added a new ChangeServersVotingStatus message to change servers to/from voting members. The leader updates its local peer info and persists and replicates a new ServerConfigurationPayload with the appropriate voting states. If the leader changes to non-voting it steps down as leader by initiating a leadership transfer. Change-Id: If073e4665cb1a270aae6e3dce36a6b3e900d0282 Signed-off-by: Tom Pantelis --- .../controller/cluster/raft/RaftActor.java | 2 +- .../RaftActorServerConfigurationSupport.java | 152 ++++++++++++++--- .../messages/ChangeServersVotingStatus.java | 38 +++++ .../raft/messages/ServerChangeReply.java | 21 +++ ...ftActorServerConfigurationSupportTest.java | 154 ++++++++++++++++++ 5 files changed, 342 insertions(+), 25 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeReply.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index aea1e9fad4..92b3ff80ac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -268,7 +268,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { + void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { LOG.debug("{}: Initiating leader transfer", persistenceId()); if(leadershipTransferInProgress == null) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 960ef15a6f..e78f39cdb1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -12,16 +12,23 @@ import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.UUID; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; @@ -59,6 +66,9 @@ class RaftActorServerConfigurationSupport { } else if(message instanceof RemoveServer) { onRemoveServer((RemoveServer) message, sender); return true; + } else if(message instanceof ChangeServersVotingStatus) { + onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender); + return true; } else if (message instanceof ServerOperationTimeout) { currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message); return true; @@ -75,6 +85,13 @@ class RaftActorServerConfigurationSupport { } } + private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) { + LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message, + currentOperationState); + + onNewOperation(new ChangeServersVotingStatusContext(message, sender)); + } + private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); boolean isSelf = removeServer.getServerId().equals(raftActor.getId()); @@ -183,13 +200,14 @@ class RaftActorServerConfigurationSupport { protected void persistNewServerConfiguration(ServerOperationContext operationContext){ raftContext.setDynamicServerConfigurationInUse(); - boolean includeSelf = !operationContext.getServerId().equals(raftActor.getId()); - ServerConfigurationPayload payload = raftContext.getPeerServerInfo(includeSelf); + ServerConfigurationPayload payload = raftContext.getPeerServerInfo( + operationContext.includeSelfInNewConfiguration(raftActor)); LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig()); raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); - currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId()))); + currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout( + operationContext.getLoggingContext()))); sendReply(operationContext, ServerChangeStatus.OK); } @@ -199,7 +217,7 @@ class RaftActorServerConfigurationSupport { sendReply(operationContext, replyStatus); } - operationContext.operationComplete(raftActor, replyStatus); + operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK); currentOperationState = IDLE; @@ -272,7 +290,7 @@ class RaftActorServerConfigurationSupport { @Override public void onServerOperationTimeout(ServerOperationTimeout timeout) { LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(), - timeout.getServerId()); + timeout.getLoggingContext()); timedOut = true; @@ -313,7 +331,7 @@ class RaftActorServerConfigurationSupport { } void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) { - String serverId = timeout.getServerId(); + String serverId = timeout.getLoggingContext(); LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId); @@ -398,7 +416,7 @@ class RaftActorServerConfigurationSupport { handleInstallSnapshotTimeout(timeout); LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), - timeout.getServerId()); + timeout.getLoggingContext()); } @Override @@ -463,7 +481,7 @@ class RaftActorServerConfigurationSupport { handleInstallSnapshotTimeout(timeout); LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete", - raftContext.getId(), timeout.getServerId()); + raftContext.getId(), timeout.getLoggingContext()); } } @@ -495,13 +513,18 @@ class RaftActorServerConfigurationSupport { return clientRequestor; } + void operationComplete(RaftActor raftActor, boolean succeeded) { + } + + boolean includeSelfInNewConfiguration(RaftActor raftActor) { + return true; + } + abstract Object newReply(ServerChangeStatus status, String leaderId); abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support); - abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus); - - abstract String getServerId(); + abstract String getLoggingContext(); } /** @@ -523,12 +546,7 @@ class RaftActorServerConfigurationSupport { } @Override - void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) { - - } - - @Override - String getServerId() { + String getLoggingContext() { return getOperation().getNewServerId(); } } @@ -581,27 +599,113 @@ class RaftActorServerConfigurationSupport { } @Override - void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) { + void operationComplete(RaftActor raftActor, boolean succeeded) { if(peerAddress != null) { raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf()); } } @Override - String getServerId() { + boolean includeSelfInNewConfiguration(RaftActor raftActor) { + return !getOperation().getServerId().equals(raftActor.getId()); + } + + @Override + String getLoggingContext() { return getOperation().getServerId(); } } + private static class ChangeServersVotingStatusContext extends ServerOperationContext { + ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor) { + super(convertMessage, clientRequestor); + } + + @Override + InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) { + return support.new ChangeServersVotingStatusState(this); + } + + @Override + Object newReply(ServerChangeStatus status, String leaderId) { + return new ServerChangeReply(status, leaderId); + } + + @Override + void operationComplete(final RaftActor raftActor, boolean succeeded) { + // If this leader changed to non-voting we need to step down as leader so we'll try to transfer + // leadership. + boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation(). + getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); + if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) { + raftActor.initiateLeadershipTransfer(new OnComplete() { + @Override + public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId()); + ensureFollowerState(raftActor); + } + + @Override + public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + LOG.debug("{}: leader transfer failed after change to non-voting", raftActor.persistenceId()); + ensureFollowerState(raftActor); + } + + private void ensureFollowerState(RaftActor raftActor) { + // Whether or not leadership transfer succeeded, we have to step down as leader and + // switch to Follower so ensure that. + if(raftActor.getRaftState() != RaftState.Follower) { + raftActor.initializeBehavior(); + } + } + }); + } + } + + @Override + String getLoggingContext() { + return getOperation().getServerVotingStatusMap().toString(); + } + } + + private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState { + private final ChangeServersVotingStatusContext changeVotingStatusContext; + + ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext) { + this.changeVotingStatusContext = changeVotingStatusContext; + } + + @Override + public void initiate() { + LOG.debug("Initiating ChangeServersVotingStatusState"); + + Map serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap(); + List newServerInfoList = new ArrayList<>(); + for(String peerId: raftContext.getPeerIds()) { + newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ? + serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting())); + } + + newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey( + raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember())); + + raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList)); + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + leader.updateMinReplicaCount(); + + persistNewServerConfiguration(changeVotingStatusContext); + } + } + static class ServerOperationTimeout { - private final String serverId; + private final String loggingContext; - ServerOperationTimeout(String serverId){ - this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null"); + ServerOperationTimeout(String loggingContext){ + this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null"); } - String getServerId() { - return serverId; + String getLoggingContext() { + return loggingContext; } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java new file mode 100644 index 0000000000..0f4601297c --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.messages; + +import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + +/** + * Message sent to change the raft voting status for servers. + * + * @author Thomas Pantelis + */ +public class ChangeServersVotingStatus implements Serializable { + private static final long serialVersionUID = 1L; + + private final Map serverVotingStatusMap; + + public ChangeServersVotingStatus(@Nonnull Map serverVotingStatusMap) { + this.serverVotingStatusMap = new HashMap<>(Preconditions.checkNotNull(serverVotingStatusMap)); + } + + public Map getServerVotingStatusMap() { + return serverVotingStatusMap; + } + + @Override + public String toString() { + return "ChangeServersVotingStatus [serverVotingStatusMap=" + serverVotingStatusMap + "]"; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeReply.java new file mode 100644 index 0000000000..4757b0bcc6 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeReply.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.messages; + +/** + * A general server change reply. + * + * @author Thomas Pantelis + */ +public class ServerChangeReply extends AbstractServerChangeReply { + private static final long serialVersionUID = 1L; + + public ServerChangeReply(ServerChangeStatus status, String leaderHint) { + super(status, leaderHint); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 36f6fe7b1a..3bf328bc16 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; @@ -20,6 +21,7 @@ import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -44,9 +46,11 @@ import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; @@ -67,6 +71,7 @@ import scala.concurrent.duration.FiniteDuration; public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { static final String LEADER_ID = "leader"; static final String FOLLOWER_ID = "follower"; + static final String FOLLOWER_ID2 = "follower2"; static final String NEW_SERVER_ID = "new-server"; static final String NEW_SERVER_ID2 = "new-server2"; private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class); @@ -838,6 +843,155 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus()); } + @Test + public void testChangeServersVotingStatus() { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID); + final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId); + final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2); + final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, + FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()). + withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); + TestActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor()); + + TestActorRef follower1RaftActor = actorFactory.createTestActor( + CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(), + FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE). + withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId); + TestActorRef follower1Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector); + + TestActorRef follower2RaftActor = actorFactory.createTestActor( + CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(), + FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE). + withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId); + TestActorRef follower2Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector); + + // Send first ChangeServersVotingStatus message + + leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)), + testKit.getRef()); + ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus()); + + final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class); + assertEquals(0L, applyState.getReplicatedLogEntry().getIndex()); + verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2)); + + MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class); + verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2)); + + MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class); + verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2)); + + MessageCollectorActor.clearMessages(leaderCollector); + MessageCollectorActor.clearMessages(follower1Collector); + MessageCollectorActor.clearMessages(follower2Collector); + + // Send second ChangeServersVotingStatus message + + leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef()); + reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus()); + + MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class); + verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2)); + + MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class); + verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2)); + + MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class); + verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2)); + } + + @Test + public void testChangeLeaderToNonVoting() { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS)); + + final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID); + final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId); + final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2); + final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, + FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()). + withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); + TestActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor()); + + TestActorRef follower1RaftActor = actorFactory.createTestActor( + CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(), + FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE). + withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId); + TestActorRef follower1Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector); + + TestActorRef follower2RaftActor = actorFactory.createTestActor( + CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(), + FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE). + withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId); + TestActorRef follower2Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector); + + // Send ChangeServersVotingStatus message + + leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef()); + ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus()); + + MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class); + verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2)); + + MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class); + verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2)); + + MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class); + verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(), + nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2)); + + verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor()); + verifyRaftState(RaftState.Follower, leaderActor.underlyingActor()); + + MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2); + } + + private void verifyRaftState(RaftState expState, RaftActor... raftActors) { + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + for(RaftActor raftActor: raftActors) { + if(raftActor.getRaftState() == expState) { + return; + } + } + } + + fail("None of the RaftActors have state " + expState); + } + private static ServerInfo votingServer(String id) { return new ServerInfo(id, true); } -- 2.36.6