From b5e778befe870d280b8f7d16b862e7f306a243e3 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 13 Jan 2016 16:17:28 -0500 Subject: [PATCH] Implement change to voting with no leader Implemented a special case where on a voting state change from non-voting to voting, if there's no leader, it will try to elect a leader in order to apply the change and progress. This is to handle a use case where one has 2 geographically-separated 3-node clusters, one a primary and the other a backup such that if the primary cluster is lost, the backup can take over. In this scenario, there's a logical 6-node cluster where the primary sub-cluster is configured as voting and the backup sub-cluster as non-voting such that the primary cluster can make progress without consensus from the backup cluster while still replicating to the backup. On fail-over to the backup, a request would be sent to a member of the backup cluster to flip the voting states, ie make the backup sub-cluster voting and the lost primary non-voting. However since the primary majority cluster is lost, there would be no leader to apply, persist and replicate the server config change. Therefore, if the server processing the request is currently non-voting and is to be changed to voting and there is no current leader, it will try to elect itself the leader by applying the new server config change in the RaftActorContext and sending an ElectionTimeout. If it's elected leader, it persists and replicates the new server config. If no leader change occurs within the election timeout period, it reverts the server config change and tries to forward the change request to another server with the same voting state change. In this manner, the intent is to elect the newly voting server that has the most up to date log. Change-Id: I67b5b2d3a97745dbe9a8215f9a28f3a840f2a0db Signed-off-by: Tom Pantelis --- .../controller/cluster/raft/RaftActor.java | 12 +- .../RaftActorServerConfigurationSupport.java | 180 +++++- .../messages/ChangeServersVotingStatus.java | 19 +- ...ftActorServerConfigurationSupportTest.java | 539 ++++++++++++++++-- 4 files changed, 683 insertions(+), 67 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 304d58beb5..a36807a8c8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -202,6 +202,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ + if(getCurrentBehavior() != null) { + try { + getCurrentBehavior().close(); + } catch(Exception e) { + LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e); + } + } + reusableBehaviorStateHolder.init(getCurrentBehavior()); setCurrentBehavior(newBehavior); handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); @@ -247,7 +255,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; if(LOG.isDebugEnabled()) { - LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex()); + LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); } persistence().persist(applyEntries, NoopProcedure.instance()); @@ -445,6 +453,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if(leadershipTransferInProgress != null) { leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); } + + serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId()); } if (roleChangeNotifier.isPresent() && diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index e78f39cdb1..09020812bd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -13,14 +13,16 @@ import akka.actor.Cancellable; import com.google.common.base.Preconditions; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.UUID; import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; @@ -35,6 +37,7 @@ import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSn import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * Handles server configuration related messages for a RaftActor. @@ -85,11 +88,34 @@ class RaftActorServerConfigurationSupport { } } + void onNewLeader(String leaderId) { + currentOperationState.onNewLeader(leaderId); + } + private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) { LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message, currentOperationState); - onNewOperation(new ChangeServersVotingStatusContext(message, sender)); + // The following check is a special case. Normally we fail an operation if there's no leader. + // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and + // the other a backup such that if the primary cluster is lost, the backup can take over. In this + // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting + // and the backup sub-cluster as non-voting such that the primary cluster can make progress without + // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup, + // a request would be sent to a member of the backup cluster to flip the voting states, ie make the + // backup sub-cluster voting and the lost primary non-voting. However since the primary majority + // cluster is lost, there would be no leader to apply, persist and replicate the server config change. + // Therefore, if the local server is currently non-voting and is to be changed to voting and there is + // no current leader, we will try to elect a leader using the new server config in order to replicate + // the change and progress. + boolean localServerChangingToVoting = Boolean.TRUE.equals(message. + getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); + boolean hasNoLeader = raftActor.getLeaderId() == null; + if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) { + currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true)); + } else { + onNewOperation(new ChangeServersVotingStatusContext(message, sender, false)); + } } private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { @@ -152,7 +178,7 @@ class RaftActorServerConfigurationSupport { ActorSelection leader = raftActor.getLeader(); if (leader != null) { LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader); - leader.forward(operationContext.getOperation(), raftActor.getContext()); + leader.tell(operationContext.getOperation(), operationContext.getClientRequestor()); } else { LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId()); operationContext.getClientRequestor().tell(operationContext.newReply( @@ -197,6 +223,9 @@ class RaftActorServerConfigurationSupport { } + void onNewLeader(String newLeader) { + } + protected void persistNewServerConfiguration(ServerOperationContext operationContext){ raftContext.setDynamicServerConfigurationInUse(); @@ -219,6 +248,10 @@ class RaftActorServerConfigurationSupport { operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK); + changeToIdleState(); + } + + protected void changeToIdleState() { currentOperationState = IDLE; ServerOperationContext nextOperation = pendingOperationsQueue.poll(); @@ -235,8 +268,12 @@ class RaftActorServerConfigurationSupport { } Cancellable newTimer(Object message) { + return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message); + } + + Cancellable newTimer(FiniteDuration timeout, Object message) { return raftContext.getActorSystem().scheduler().scheduleOnce( - raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message, + timeout, raftContext.getActor(), message, raftContext.getActorSystem().dispatcher(), raftContext.getActor()); } @@ -617,13 +654,17 @@ class RaftActorServerConfigurationSupport { } private static class ChangeServersVotingStatusContext extends ServerOperationContext { - ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor) { + private final boolean tryToElectLeader; + + ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor, + boolean tryToElectLeader) { super(convertMessage, clientRequestor); + this.tryToElectLeader = tryToElectLeader; } @Override InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) { - return support.new ChangeServersVotingStatusState(this); + return support.new ChangeServersVotingStatusState(this, tryToElectLeader); } @Override @@ -638,7 +679,7 @@ class RaftActorServerConfigurationSupport { boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation(). getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) { - raftActor.initiateLeadershipTransfer(new OnComplete() { + raftActor.initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId()); @@ -664,21 +705,55 @@ class RaftActorServerConfigurationSupport { @Override String getLoggingContext() { - return getOperation().getServerVotingStatusMap().toString(); + return getOperation().toString(); } } private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState { private final ChangeServersVotingStatusContext changeVotingStatusContext; + private final boolean tryToElectLeader; - ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext) { + ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext, + boolean tryToElectLeader) { this.changeVotingStatusContext = changeVotingStatusContext; + this.tryToElectLeader = tryToElectLeader; } @Override public void initiate() { LOG.debug("Initiating ChangeServersVotingStatusState"); + if(tryToElectLeader) { + initiateLocalLeaderElection(); + } else { + updateLocalPeerInfo(); + + persistNewServerConfiguration(changeVotingStatusContext); + } + } + + private void initiateLocalLeaderElection() { + LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId()); + + ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true); + updateLocalPeerInfo(); + + raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor()); + + currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig); + } + + private void updateLocalPeerInfo() { + List newServerInfoList = newServerInfoList(); + + raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList)); + if(raftActor.getCurrentBehavior() instanceof AbstractLeader) { + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + leader.updateMinReplicaCount(); + } + } + + private List newServerInfoList() { Map serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap(); List newServerInfoList = new ArrayList<>(); for(String peerId: raftContext.getPeerIds()) { @@ -689,11 +764,90 @@ class RaftActorServerConfigurationSupport { newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey( raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember())); - raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList)); - AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - leader.updateMinReplicaCount(); + return newServerInfoList; + } + } - persistNewServerConfiguration(changeVotingStatusContext); + private class WaitingForLeaderElected extends OperationState { + private final ServerConfigurationPayload previousServerConfig; + private final ChangeServersVotingStatusContext operationContext; + private final Cancellable timer; + + WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext, + ServerConfigurationPayload previousServerConfig) { + this.operationContext = operationContext; + this.previousServerConfig = previousServerConfig; + + timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(), + new ServerOperationTimeout(operationContext.getLoggingContext())); + } + + @Override + void onNewLeader(String newLeader) { + LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader); + + timer.cancel(); + + if(raftActor.isLeader()) { + persistNewServerConfiguration(operationContext); + } else { + // Edge case - some other node became leader so forward the operation. + LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation()); + + // Revert the local server config change. + raftContext.updatePeerIds(previousServerConfig); + + changeToIdleState(); + RaftActorServerConfigurationSupport.this.onNewOperation(operationContext); + } + } + + @Override + void onServerOperationTimeout(ServerOperationTimeout timeout) { + LOG.warn("{}: Leader election timed out - cannot apply operation {}", + raftContext.getId(), timeout.getLoggingContext()); + + // Revert the local server config change. + raftContext.updatePeerIds(previousServerConfig); + raftActor.initializeBehavior(); + + tryToForwardOperationToAnotherServer(); + } + + private void tryToForwardOperationToAnotherServer() { + Collection serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited()); + + LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(), + serversVisited); + + serversVisited.add(raftContext.getId()); + + // Try to find another whose state is being changed from non-voting to voting and that we haven't + // tried yet. + Map serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap(); + ActorSelection forwardToPeerActor = null; + for(Map.Entry e: serverVotingStatusMap.entrySet()) { + Boolean isVoting = e.getValue(); + String serverId = e.getKey(); + PeerInfo peerInfo = raftContext.getPeerInfo(serverId); + if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) { + ActorSelection actor = raftContext.getPeerActorSelection(serverId); + if(actor != null) { + forwardToPeerActor = actor; + break; + } + } + } + + if(forwardToPeerActor != null) { + LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor); + + forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited), + operationContext.getClientRequestor()); + changeToIdleState(); + } else { + operationComplete(operationContext, ServerChangeStatus.NO_LEADER); + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java index 0f4601297c..3dc36934f4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java @@ -8,7 +8,10 @@ package org.opendaylight.controller.cluster.raft.messages; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import javax.annotation.Nonnull; @@ -22,17 +25,31 @@ public class ChangeServersVotingStatus implements Serializable { private static final long serialVersionUID = 1L; private final Map serverVotingStatusMap; + private final Collection serversVisited; public ChangeServersVotingStatus(@Nonnull Map serverVotingStatusMap) { + this(serverVotingStatusMap, Collections.emptySet()); + } + + public ChangeServersVotingStatus(@Nonnull Map serverVotingStatusMap, + @Nonnull Collection serversVisited) { this.serverVotingStatusMap = new HashMap<>(Preconditions.checkNotNull(serverVotingStatusMap)); + this.serversVisited = ImmutableSet.copyOf(Preconditions.checkNotNull(serversVisited)); } + @Nonnull public Map getServerVotingStatusMap() { return serverVotingStatusMap; } + @Nonnull + public Collection getServersVisited() { + return serversVisited; + } + @Override public String toString() { - return "ChangeServersVotingStatus [serverVotingStatusMap=" + serverVotingStatusMap + "]"; + return "ChangeServersVotingStatus [serverVotingStatusMap=" + serverVotingStatusMap + + (serversVisited != null ? ", serversVisited=" + serversVisited : "") + "]"; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 3bf328bc16..bdfb2b20c8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -25,6 +25,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,13 +33,16 @@ import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; +import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -50,6 +54,7 @@ import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStat import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; @@ -75,7 +80,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { static final String NEW_SERVER_ID = "new-server"; static final String NEW_SERVER_ID2 = "new-server2"; private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class); - private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider(); + private static final boolean NO_PERSISTENCE = false; + private static final boolean PERSISTENT = true; private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); @@ -93,7 +99,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { public void setup() { InMemoryJournal.clear(); InMemorySnapshotStore.clear(); + } + private void setupNewFollower() { DefaultConfigParamsImpl configParams = newFollowerConfigParams(); newFollowerCollectorActor = actorFactory.createTestActor( @@ -125,6 +133,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { @Test public void testAddServerWithExistingFollower() throws Exception { + LOG.info("testAddServerWithExistingFollower starting"); + setupNewFollower(); RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor); followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries( 0, 3, 1).build()); @@ -209,10 +219,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex()); assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass()); + + LOG.info("testAddServerWithExistingFollower ending"); } @Test public void testAddServerWithNoExistingFollower() throws Exception { + LOG.info("testAddServerWithNoExistingFollower starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); initialActorContext.setCommitIndex(1); initialActorContext.setLastApplied(1); @@ -261,10 +276,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { // Verify new server config was applied in the new follower assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds()); + + LOG.info("testAddServerWithNoExistingFollower ending"); } @Test public void testAddServersAsNonVoting() throws Exception { + LOG.info("testAddServersAsNonVoting starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -326,10 +346,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied()); verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2)); + + LOG.info("testAddServersAsNonVoting ending"); } @Test public void testAddServerWithOperationInProgress() throws Exception { + LOG.info("testAddServerWithOperationInProgress starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -384,10 +409,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { expectMatching(newFollowerCollectorActor, ApplyState.class, 2); assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2), newFollowerActorContext.getPeerIds()); + + LOG.info("testAddServerWithOperationInProgress ending"); } @Test public void testAddServerWithPriorSnapshotInProgress() throws Exception { + LOG.info("testAddServerWithPriorSnapshotInProgress starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -426,10 +456,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied()); verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID), votingServer(NEW_SERVER_ID)); + + LOG.info("testAddServerWithPriorSnapshotInProgress ending"); } @Test public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception { + LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -453,10 +488,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus()); assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size()); + + LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending"); } @Test public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception { + LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -497,10 +537,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size()); assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing()); + + LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending"); } @Test public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception { + LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -537,10 +582,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size()); + + LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending"); } @Test public void testAddServerWithInstallSnapshotTimeout() throws Exception { + LOG.info("testAddServerWithInstallSnapshotTimeout starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -565,26 +615,37 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size()); assertEquals("Leader followers size", 0, ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size()); + + LOG.info("testAddServerWithInstallSnapshotTimeout ending"); } @Test public void testAddServerWithNoLeader() { + LOG.info("testAddServerWithNoLeader starting"); + + setupNewFollower(); DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef noLeaderActor = actorFactory.createTestActor( - MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), - configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID, + followerActor.path().toString())).config(configParams).persistent(Optional.of(false)). + props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete(); noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); + + LOG.info("testAddServerWithNoLeader ending"); } @Test public void testAddServerWithNoConsensusReached() { + LOG.info("testAddServerWithNoConsensusReached starting"); + + setupNewFollower(); RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -638,10 +699,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef()); addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus()); + + LOG.info("testAddServerWithNoConsensusReached ending"); } @Test public void testAddServerWithExistingServer() { + LOG.info("testAddServerWithExistingServer starting"); + RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -653,10 +718,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus()); + + LOG.info("testAddServerWithExistingServer ending"); } @Test public void testAddServerForwardedToLeader() { + LOG.info("testAddServerForwardedToLeader starting"); + + setupNewFollower(); DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -665,8 +735,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { actorFactory.generateActorId(LEADER_ID)); TestActorRef followerRaftActor = actorFactory.createTestActor( - MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), - configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID, + leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)). + props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(FOLLOWER_ID)); followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete(); @@ -675,15 +746,20 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); expectFirstMatching(leaderActor, AddServer.class); + + LOG.info("testAddServerForwardedToLeader ending"); } @Test public void testOnApplyState() { + LOG.info("testOnApplyState starting"); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef noLeaderActor = actorFactory.createTestActor( - MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), - configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID, + followerActor.path().toString())).config(configParams).persistent(Optional.of(false)). + props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor()); @@ -697,26 +773,35 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("1")); handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender()); assertEquals("Message handled", false, handled); + + LOG.info("testOnApplyState ending"); } @Test public void testRemoveServerWithNoLeader() { + LOG.info("testRemoveServerWithNoLeader starting"); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef leaderActor = actorFactory.createTestActor( - MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), - configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID, + followerActor.path().toString())).config(configParams).persistent(Optional.of(false)). + props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); leaderActor.underlyingActor().waitForInitializeBehaviorComplete(); leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef()); RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class); assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus()); + + LOG.info("testRemoveServerWithNoLeader ending"); } @Test public void testRemoveServerNonExistentServer() { + LOG.info("testRemoveServerNonExistentServer starting"); + RaftActorContext initialActorContext = new MockRaftActorContext(); TestActorRef leaderActor = actorFactory.createTestActor( @@ -727,10 +812,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef()); RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class); assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus()); + + LOG.info("testRemoveServerNonExistentServer ending"); } @Test public void testRemoveServerForwardToLeader() { + LOG.info("testRemoveServerForwardToLeader starting"); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -739,8 +828,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { actorFactory.generateActorId(LEADER_ID)); TestActorRef followerRaftActor = actorFactory.createTestActor( - MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), - configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID, + leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)). + props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(FOLLOWER_ID)); followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete(); @@ -749,10 +839,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef()); expectFirstMatching(leaderActor, RemoveServer.class); + + LOG.info("testRemoveServerForwardToLeader ending"); } @Test public void testRemoveServer() { + LOG.info("testRemoveServer starting"); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); @@ -768,16 +862,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { TestActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor()); + TestActorRef collector = + actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); TestActorRef followerRaftActor = actorFactory.createTestActor( CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), - configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()), followerActorId); - TestActorRef collector = - actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector")); - - followerRaftActor.underlyingActor().setCollectorActor(collector); - leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef()); RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class); assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus()); @@ -791,10 +883,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size()); MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class); + + LOG.info("testRemoveServer ending"); } @Test public void testRemoveServerLeader() { + LOG.info("testRemoveServerLeader starting"); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); @@ -810,14 +906,12 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { TestActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor()); - TestActorRef followerRaftActor = actorFactory.createTestActor( - CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), - configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), - followerActorId); - TestActorRef followerCollector = actorFactory.createTestActor(MessageCollectorActor.props(). withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector")); - followerRaftActor.underlyingActor().setCollectorActor(followerCollector); + actorFactory.createTestActor( + CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), + configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()), + followerActorId); leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef()); RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class); @@ -829,10 +923,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { votingServer(FOLLOWER_ID)); MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class); + + LOG.info("testRemoveServerLeader ending"); } @Test public void testRemoveServerLeaderWithNoFollowers() { + LOG.info("testRemoveServerLeaderWithNoFollowers starting"); + TestActorRef leaderActor = actorFactory.createTestActor( MockLeaderRaftActor.props(Collections.emptyMap(), new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -841,10 +939,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef()); RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class); assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus()); + + LOG.info("testRemoveServerLeaderWithNoFollowers ending"); } @Test public void testChangeServersVotingStatus() { + LOG.info("testChangeServersVotingStatus starting"); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); @@ -860,23 +962,21 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); TestActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor()); + TestActorRef follower1Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); TestActorRef follower1RaftActor = actorFactory.createTestActor( CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(), - FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE). + FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector). withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId); - TestActorRef follower1Collector = actorFactory.createTestActor( + + TestActorRef follower2Collector = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector")); - follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector); - TestActorRef follower2RaftActor = actorFactory.createTestActor( CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(), - FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE). + FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector). withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId); - TestActorRef follower2Collector = actorFactory.createTestActor( - MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), - actorFactory.generateActorId("collector")); - follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector); // Send first ChangeServersVotingStatus message @@ -919,10 +1019,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class); verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2)); + + LOG.info("testChangeServersVotingStatus ending"); } @Test public void testChangeLeaderToNonVoting() { + LOG.info("testChangeLeaderToNonVoting starting"); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS)); @@ -937,23 +1041,21 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); TestActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor()); + TestActorRef follower1Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); TestActorRef follower1RaftActor = actorFactory.createTestActor( CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(), - FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE). + FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector). withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId); - TestActorRef follower1Collector = actorFactory.createTestActor( + + TestActorRef follower2Collector = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector")); - follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector); - TestActorRef follower2RaftActor = actorFactory.createTestActor( CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(), - FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE). + FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector). withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId); - TestActorRef follower2Collector = actorFactory.createTestActor( - MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), - actorFactory.generateActorId("collector")); - follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector); // Send ChangeServersVotingStatus message @@ -977,6 +1079,326 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { verifyRaftState(RaftState.Follower, leaderActor.underlyingActor()); MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2); + + LOG.info("testChangeLeaderToNonVoting ending"); + } + + @Test + public void testChangeToVotingWithNoLeader() { + LOG.info("testChangeToVotingWithNoLeader starting"); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + + final String node1ID = "node1"; + final String node2ID = "node2"; + + // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting. + // via the server config. The server config will also contain 2 voting peers that are down (ie no + // actors created). + + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo(node1ID, false), new ServerInfo(node2ID, false), + new ServerInfo("downNode1", true), new ServerInfo("downNode2", true))); + ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig); + + InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1")); + InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry); + InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2")); + InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry); + + TestActorRef node1Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + TestActorRef node1RaftActorRef = actorFactory.createTestActor( + CollectingMockRaftActor.props(node1ID, ImmutableMap.of(), configParams, + PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID); + CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor(); + + TestActorRef node2Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + TestActorRef node2RaftActorRef = actorFactory.createTestActor( + CollectingMockRaftActor.props(node2ID, ImmutableMap.of(), configParams, + PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID); + CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor(); + + // Wait for snapshot after recovery + MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class); + + // Verify the intended server config was loaded and applied. + verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(), + nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"), + votingServer("downNode2")); + assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember()); + assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState()); + assertEquals("getLeaderId", null, node1RaftActor.getLeaderId()); + + MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class); + assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember()); + + // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for + // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause + // node1 to try to elect itself as leader in order to apply the new server config. Since the 2 + // down nodes are switched to non-voting, node1 should only need a vote from node2. + + // First send the message such that node1 has no peer address for node2 - should fail. + + ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true, + node2ID, true, "downNode1", false, "downNode2", false)); + node1RaftActorRef.tell(changeServers, testKit.getRef()); + ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus()); + + // Update node2's peer address and send the message again + + node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString()); + + node1RaftActorRef.tell(changeServers, testKit.getRef()); + reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus()); + + ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class); + assertEquals("getToIndex", 1, apply.getToIndex()); + verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(), + votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"), + nonVotingServer("downNode2")); + assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember()); + assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState()); + + apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class); + assertEquals("getToIndex", 1, apply.getToIndex()); + verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(), + votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"), + nonVotingServer("downNode2")); + assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember()); + assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState()); + + LOG.info("testChangeToVotingWithNoLeader ending"); + } + + @Test + public void testChangeToVotingWithNoLeaderAndElectionTimeout() { + LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting"); + + final String node1ID = "node1"; + final String node2ID = "node2"; + + PeerAddressResolver peerAddressResolver = new PeerAddressResolver() { + @Override + public String resolve(String peerId) { + return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) : + peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null; + } + }; + + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo(node1ID, false), new ServerInfo(node2ID, true))); + ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig); + + InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1")); + InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry); + InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1")); + InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry); + + DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl(); + configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + configParams1.setElectionTimeoutFactor(1); + configParams1.setPeerAddressResolver(peerAddressResolver); + TestActorRef node1Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + TestActorRef node1RaftActorRef = actorFactory.createTestActor( + CollectingMockRaftActor.props(node1ID, ImmutableMap.of(), configParams1, + PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID); + CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor(); + + DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl(); + configParams2.setElectionTimeoutFactor(1000000); + configParams2.setPeerAddressResolver(peerAddressResolver); + TestActorRef node2Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + TestActorRef node2RaftActorRef = actorFactory.createTestActor( + CollectingMockRaftActor.props(node2ID, ImmutableMap.of(), configParams2, + PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID); + CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor(); + + // Wait for snapshot after recovery + MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class); + + // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause + // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop + // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous + // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c + // node2 was previously voting. + + node2RaftActor.setDropMessageOfType(RequestVote.class); + + ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true)); + node1RaftActorRef.tell(changeServers, testKit.getRef()); + ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus()); + + assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)), + Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig())); + assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState()); + + LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending"); + } + + @Test + public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() { + LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting"); + + final String node1ID = "node1"; + final String node2ID = "node2"; + + PeerAddressResolver peerAddressResolver = new PeerAddressResolver() { + @Override + public String resolve(String peerId) { + return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) : + peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null; + } + }; + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(3); + configParams.setPeerAddressResolver(peerAddressResolver); + + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo(node1ID, false), new ServerInfo(node2ID, false))); + ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig); + + InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1")); + InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry); + InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1")); + InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry); + InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1, + new MockRaftActorContext.MockPayload("2"))); + + TestActorRef node1Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + TestActorRef node1RaftActorRef = actorFactory.createTestActor( + CollectingMockRaftActor.props(node1ID, ImmutableMap.of(), configParams, + PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID); + CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor(); + + TestActorRef node2Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + TestActorRef node2RaftActorRef = actorFactory.createTestActor( + CollectingMockRaftActor.props(node2ID, ImmutableMap.of(), configParams, + PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID); + CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor(); + + // Wait for snapshot after recovery + MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class); + + // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause + // node1 to try to elect itself as leader in order to apply the new server config. However node1's log + // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and + // forward the request to node2. + + ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus( + ImmutableMap.of(node1ID, true, node2ID, true)); + node1RaftActorRef.tell(changeServers, testKit.getRef()); + ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus()); + + MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class); + verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(), + votingServer(node1ID), votingServer(node2ID)); + assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState()); + + MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class); + verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(), + votingServer(node1ID), votingServer(node2ID)); + assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember()); + assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState()); + + LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending"); + } + + @Test + public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() { + LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting"); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(100000); + + final String node1ID = "node1"; + final String node2ID = "node2"; + + configParams.setPeerAddressResolver(new PeerAddressResolver() { + @Override + public String resolve(String peerId) { + return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) : + peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null; + } + }); + + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo(node1ID, false), new ServerInfo(node2ID, true))); + ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig); + + InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1")); + InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry); + InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1")); + InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry); + + TestActorRef node1Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + TestActorRef node1RaftActorRef = actorFactory.createTestActor( + CollectingMockRaftActor.props(node1ID, ImmutableMap.of(), configParams, + PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID); + CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor(); + + TestActorRef node2Collector = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId("collector")); + TestActorRef node2RaftActorRef = actorFactory.createTestActor( + CollectingMockRaftActor.props(node2ID, ImmutableMap.of(), configParams, + PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID); + CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor(); + + // Wait for snapshot after recovery + MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class); + + // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause + // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop + // RequestVote messages in node2 and make it the leader so node1 should forward the server change + // request to node2 when node2 is elected. + + node2RaftActor.setDropMessageOfType(RequestVote.class); + + ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true, + node2ID, true)); + node1RaftActorRef.tell(changeServers, testKit.getRef()); + + MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class); + + node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender()); + + ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus()); + + MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class); + verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(), + votingServer(node1ID), votingServer(node2ID)); + assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember()); + assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState()); + + MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class); + verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(), + votingServer(node1ID), votingServer(node2ID)); + assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState()); + + LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending"); } private void verifyRaftState(RaftState expState, RaftActor... raftActors) { @@ -1016,17 +1438,18 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { ReplicatedLogEntry logEntry = log.get(log.lastIndex()); assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass()); ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData(); - assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig())); + assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig())); } private static RaftActorContext newFollowerContext(String id, TestActorRef actor) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); - ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG); + NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(); + ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG); termInfo.update(1, LEADER_ID); return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), - id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG); + id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG); } static abstract class AbstractMockRaftActor extends MockRaftActor { @@ -1034,9 +1457,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { private volatile Class dropMessageOfType; AbstractMockRaftActor(String id, Map peerAddresses, Optional config, - DataPersistenceProvider dataPersistenceProvider, TestActorRef collectorActor) { + boolean persistent, TestActorRef collectorActor) { super(builder().id(id).peerAddresses(peerAddresses).config(config.get()). - dataPersistenceProvider(dataPersistenceProvider)); + persistent(Optional.of(persistent))); this.collectorActor = collectorActor; } @@ -1062,14 +1485,26 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { public static class CollectingMockRaftActor extends AbstractMockRaftActor { - CollectingMockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider, TestActorRef collectorActor) { - super(id, peerAddresses, config, dataPersistenceProvider, collectorActor); + CollectingMockRaftActor(String id, Map peerAddresses, Optional config, + boolean persistent, TestActorRef collectorActor) { + super(id, peerAddresses, config, persistent, collectorActor); + snapshotCohortDelegate = new RaftActorSnapshotCohort() { + @Override + public void createSnapshot(ActorRef actorRef) { + actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef); + } + + @Override + public void applySnapshot(byte[] snapshotBytes) { + } + }; } public static Props props(final String id, final Map peerAddresses, - ConfigParams config, DataPersistenceProvider dataPersistenceProvider){ + ConfigParams config, boolean persistent, TestActorRef collectorActor){ - return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null); + return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), + persistent, collectorActor); } } @@ -1118,7 +1553,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { public static class MockNewFollowerRaftActor extends AbstractMockRaftActor { public MockNewFollowerRaftActor(ConfigParams config, TestActorRef collectorActor) { - super(NEW_SERVER_ID, Maps.newHashMap(), Optional.of(config), null, collectorActor); + super(NEW_SERVER_ID, Maps.newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor); setPersistence(false); } -- 2.36.6