import com.google.common.base.Preconditions;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete;
import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
/**
* Handles server configuration related messages for a RaftActor.
}
}
+ void onNewLeader(String leaderId) {
+ currentOperationState.onNewLeader(leaderId);
+ }
+
private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) {
LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message,
currentOperationState);
- onNewOperation(new ChangeServersVotingStatusContext(message, sender));
+ // The following check is a special case. Normally we fail an operation if there's no leader.
+ // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and
+ // the other a backup such that if the primary cluster is lost, the backup can take over. In this
+ // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting
+ // and the backup sub-cluster as non-voting such that the primary cluster can make progress without
+ // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup,
+ // a request would be sent to a member of the backup cluster to flip the voting states, ie make the
+ // backup sub-cluster voting and the lost primary non-voting. However since the primary majority
+ // cluster is lost, there would be no leader to apply, persist and replicate the server config change.
+ // Therefore, if the local server is currently non-voting and is to be changed to voting and there is
+ // no current leader, we will try to elect a leader using the new server config in order to replicate
+ // the change and progress.
+ boolean localServerChangingToVoting = Boolean.TRUE.equals(message.
+ getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
+ boolean hasNoLeader = raftActor.getLeaderId() == null;
+ if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
+ currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true));
+ } else {
+ onNewOperation(new ChangeServersVotingStatusContext(message, sender, false));
+ }
}
private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
ActorSelection leader = raftActor.getLeader();
if (leader != null) {
LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
- leader.forward(operationContext.getOperation(), raftActor.getContext());
+ leader.tell(operationContext.getOperation(), operationContext.getClientRequestor());
} else {
LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
operationContext.getClientRequestor().tell(operationContext.newReply(
}
+ void onNewLeader(String newLeader) {
+ }
+
protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
raftContext.setDynamicServerConfigurationInUse();
operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK);
+ changeToIdleState();
+ }
+
+ protected void changeToIdleState() {
currentOperationState = IDLE;
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
}
Cancellable newTimer(Object message) {
+ return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message);
+ }
+
+ Cancellable newTimer(FiniteDuration timeout, Object message) {
return raftContext.getActorSystem().scheduler().scheduleOnce(
- raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
+ timeout, raftContext.getActor(), message,
raftContext.getActorSystem().dispatcher(), raftContext.getActor());
}
}
private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
- ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor) {
+ private final boolean tryToElectLeader;
+
+ ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor,
+ boolean tryToElectLeader) {
super(convertMessage, clientRequestor);
+ this.tryToElectLeader = tryToElectLeader;
}
@Override
InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
- return support.new ChangeServersVotingStatusState(this);
+ return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
}
@Override
boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) {
- raftActor.initiateLeadershipTransfer(new OnComplete() {
+ raftActor.initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId());
@Override
String getLoggingContext() {
- return getOperation().getServerVotingStatusMap().toString();
+ return getOperation().toString();
}
}
private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
private final ChangeServersVotingStatusContext changeVotingStatusContext;
+ private final boolean tryToElectLeader;
- ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext) {
+ ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext,
+ boolean tryToElectLeader) {
this.changeVotingStatusContext = changeVotingStatusContext;
+ this.tryToElectLeader = tryToElectLeader;
}
@Override
public void initiate() {
LOG.debug("Initiating ChangeServersVotingStatusState");
+ if(tryToElectLeader) {
+ initiateLocalLeaderElection();
+ } else {
+ updateLocalPeerInfo();
+
+ persistNewServerConfiguration(changeVotingStatusContext);
+ }
+ }
+
+ private void initiateLocalLeaderElection() {
+ LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
+
+ ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
+ updateLocalPeerInfo();
+
+ raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
+
+ currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
+ }
+
+ private void updateLocalPeerInfo() {
+ List<ServerInfo> newServerInfoList = newServerInfoList();
+
+ raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
+ if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
+ AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ leader.updateMinReplicaCount();
+ }
+ }
+
+ private List<ServerInfo> newServerInfoList() {
Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap();
List<ServerInfo> newServerInfoList = new ArrayList<>();
for(String peerId: raftContext.getPeerIds()) {
newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember()));
- raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
- AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
- leader.updateMinReplicaCount();
+ return newServerInfoList;
+ }
+ }
- persistNewServerConfiguration(changeVotingStatusContext);
+ private class WaitingForLeaderElected extends OperationState {
+ private final ServerConfigurationPayload previousServerConfig;
+ private final ChangeServersVotingStatusContext operationContext;
+ private final Cancellable timer;
+
+ WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext,
+ ServerConfigurationPayload previousServerConfig) {
+ this.operationContext = operationContext;
+ this.previousServerConfig = previousServerConfig;
+
+ timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
+ new ServerOperationTimeout(operationContext.getLoggingContext()));
+ }
+
+ @Override
+ void onNewLeader(String newLeader) {
+ LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
+
+ timer.cancel();
+
+ if(raftActor.isLeader()) {
+ persistNewServerConfiguration(operationContext);
+ } else {
+ // Edge case - some other node became leader so forward the operation.
+ LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
+
+ // Revert the local server config change.
+ raftContext.updatePeerIds(previousServerConfig);
+
+ changeToIdleState();
+ RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
+ }
+ }
+
+ @Override
+ void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ LOG.warn("{}: Leader election timed out - cannot apply operation {}",
+ raftContext.getId(), timeout.getLoggingContext());
+
+ // Revert the local server config change.
+ raftContext.updatePeerIds(previousServerConfig);
+ raftActor.initializeBehavior();
+
+ tryToForwardOperationToAnotherServer();
+ }
+
+ private void tryToForwardOperationToAnotherServer() {
+ Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
+
+ LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
+ serversVisited);
+
+ serversVisited.add(raftContext.getId());
+
+ // Try to find another whose state is being changed from non-voting to voting and that we haven't
+ // tried yet.
+ Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
+ ActorSelection forwardToPeerActor = null;
+ for(Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
+ Boolean isVoting = e.getValue();
+ String serverId = e.getKey();
+ PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
+ if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
+ ActorSelection actor = raftContext.getPeerActorSelection(serverId);
+ if(actor != null) {
+ forwardToPeerActor = actor;
+ break;
+ }
+ }
+ }
+
+ if(forwardToPeerActor != null) {
+ LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
+
+ forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
+ operationContext.getClientRequestor());
+ changeToIdleState();
+ } else {
+ operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
+ }
}
}
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
static final String NEW_SERVER_ID = "new-server";
static final String NEW_SERVER_ID2 = "new-server2";
private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
- private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
+ private static final boolean NO_PERSISTENCE = false;
+ private static final boolean PERSISTENT = true;
private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
public void setup() {
InMemoryJournal.clear();
InMemorySnapshotStore.clear();
+ }
+ private void setupNewFollower() {
DefaultConfigParamsImpl configParams = newFollowerConfigParams();
newFollowerCollectorActor = actorFactory.createTestActor(
@Test
public void testAddServerWithExistingFollower() throws Exception {
+ LOG.info("testAddServerWithExistingFollower starting");
+ setupNewFollower();
RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
0, 3, 1).build());
assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
logEntry.getData().getClass());
+
+ LOG.info("testAddServerWithExistingFollower ending");
}
@Test
public void testAddServerWithNoExistingFollower() throws Exception {
+ LOG.info("testAddServerWithNoExistingFollower starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
initialActorContext.setCommitIndex(1);
initialActorContext.setLastApplied(1);
// Verify new server config was applied in the new follower
assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
+
+ LOG.info("testAddServerWithNoExistingFollower ending");
}
@Test
public void testAddServersAsNonVoting() throws Exception {
+ LOG.info("testAddServersAsNonVoting starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
+
+ LOG.info("testAddServersAsNonVoting ending");
}
@Test
public void testAddServerWithOperationInProgress() throws Exception {
+ LOG.info("testAddServerWithOperationInProgress starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
newFollowerActorContext.getPeerIds());
+
+ LOG.info("testAddServerWithOperationInProgress ending");
}
@Test
public void testAddServerWithPriorSnapshotInProgress() throws Exception {
+ LOG.info("testAddServerWithPriorSnapshotInProgress starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
votingServer(NEW_SERVER_ID));
+
+ LOG.info("testAddServerWithPriorSnapshotInProgress ending");
}
@Test
public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
+ LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+
+ LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
}
@Test
public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
+ LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
+
+ LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
}
@Test
public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
+ LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+
+ LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
}
@Test
public void testAddServerWithInstallSnapshotTimeout() throws Exception {
+ LOG.info("testAddServerWithInstallSnapshotTimeout starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
assertEquals("Leader followers size", 0,
((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
+
+ LOG.info("testAddServerWithInstallSnapshotTimeout ending");
}
@Test
public void testAddServerWithNoLeader() {
+ LOG.info("testAddServerWithNoLeader starting");
+
+ setupNewFollower();
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
- MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
- configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
+ followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+ props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+
+ LOG.info("testAddServerWithNoLeader ending");
}
@Test
public void testAddServerWithNoConsensusReached() {
+ LOG.info("testAddServerWithNoConsensusReached starting");
+
+ setupNewFollower();
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
+
+ LOG.info("testAddServerWithNoConsensusReached ending");
}
@Test
public void testAddServerWithExistingServer() {
+ LOG.info("testAddServerWithExistingServer starting");
+
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
+
+ LOG.info("testAddServerWithExistingServer ending");
}
@Test
public void testAddServerForwardedToLeader() {
+ LOG.info("testAddServerForwardedToLeader starting");
+
+ setupNewFollower();
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
actorFactory.generateActorId(LEADER_ID));
TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
- MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
- configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
+ leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+ props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(FOLLOWER_ID));
followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
expectFirstMatching(leaderActor, AddServer.class);
+
+ LOG.info("testAddServerForwardedToLeader ending");
}
@Test
public void testOnApplyState() {
+ LOG.info("testOnApplyState starting");
+
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
- MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
- configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
+ followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+ props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
new MockRaftActorContext.MockPayload("1"));
handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
assertEquals("Message handled", false, handled);
+
+ LOG.info("testOnApplyState ending");
}
@Test
public void testRemoveServerWithNoLeader() {
+ LOG.info("testRemoveServerWithNoLeader starting");
+
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
- MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
- configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
+ followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+ props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
+
+ LOG.info("testRemoveServerWithNoLeader ending");
}
@Test
public void testRemoveServerNonExistentServer() {
+ LOG.info("testRemoveServerNonExistentServer starting");
+
RaftActorContext initialActorContext = new MockRaftActorContext();
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
+
+ LOG.info("testRemoveServerNonExistentServer ending");
}
@Test
public void testRemoveServerForwardToLeader() {
+ LOG.info("testRemoveServerForwardToLeader starting");
+
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
actorFactory.generateActorId(LEADER_ID));
TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
- MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
- configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
+ leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+ props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(FOLLOWER_ID));
followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
expectFirstMatching(leaderActor, RemoveServer.class);
+
+ LOG.info("testRemoveServerForwardToLeader ending");
}
@Test
public void testRemoveServer() {
+ LOG.info("testRemoveServer starting");
+
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
+ TestActorRef<MessageCollectorActor> collector =
+ actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
- configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
followerActorId);
- TestActorRef<MessageCollectorActor> collector =
- actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
-
- followerRaftActor.underlyingActor().setCollectorActor(collector);
-
leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
+
+ LOG.info("testRemoveServer ending");
}
@Test
public void testRemoveServerLeader() {
+ LOG.info("testRemoveServerLeader starting");
+
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
- TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
- CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
- configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
- followerActorId);
-
TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
- followerRaftActor.underlyingActor().setCollectorActor(followerCollector);
+ actorFactory.createTestActor(
+ CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
+ configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ followerActorId);
leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
votingServer(FOLLOWER_ID));
MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
+
+ LOG.info("testRemoveServerLeader ending");
}
@Test
public void testRemoveServerLeaderWithNoFollowers() {
+ LOG.info("testRemoveServerLeaderWithNoFollowers starting");
+
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
+
+ LOG.info("testRemoveServerLeaderWithNoFollowers ending");
}
@Test
public void testChangeServersVotingStatus() {
+ LOG.info("testChangeServersVotingStatus starting");
+
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
+ TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
- FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE).
+ FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
- TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+
+ TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId("collector"));
- follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector);
-
TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
- FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE).
+ FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
- TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
- MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- actorFactory.generateActorId("collector"));
- follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector);
// Send first ChangeServersVotingStatus message
MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
+
+ LOG.info("testChangeServersVotingStatus ending");
}
@Test
public void testChangeLeaderToNonVoting() {
+ LOG.info("testChangeLeaderToNonVoting starting");
+
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
+ TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
- FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE).
+ FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
- TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+
+ TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId("collector"));
- follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector);
-
TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
- FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE).
+ FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
- TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
- MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- actorFactory.generateActorId("collector"));
- follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector);
// Send ChangeServersVotingStatus message
verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
+
+ LOG.info("testChangeLeaderToNonVoting ending");
+ }
+
+ @Test
+ public void testChangeToVotingWithNoLeader() {
+ LOG.info("testChangeToVotingWithNoLeader starting");
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+
+ final String node1ID = "node1";
+ final String node2ID = "node2";
+
+ // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
+ // via the server config. The server config will also contain 2 voting peers that are down (ie no
+ // actors created).
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
+ new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
+ ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+ InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
+ InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
+ InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+
+ TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+ CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+ TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+ CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+ // Wait for snapshot after recovery
+ MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+ // Verify the intended server config was loaded and applied.
+ verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+ nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
+ votingServer("downNode2"));
+ assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
+ assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+ assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
+
+ MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class);
+ assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
+
+ // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
+ // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
+ // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
+ // down nodes are switched to non-voting, node1 should only need a vote from node2.
+
+ // First send the message such that node1 has no peer address for node2 - should fail.
+
+ ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
+ node2ID, true, "downNode1", false, "downNode2", false));
+ node1RaftActorRef.tell(changeServers, testKit.getRef());
+ ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
+
+ // Update node2's peer address and send the message again
+
+ node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
+
+ node1RaftActorRef.tell(changeServers, testKit.getRef());
+ reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
+
+ ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+ assertEquals("getToIndex", 1, apply.getToIndex());
+ verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
+ nonVotingServer("downNode2"));
+ assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
+ assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
+
+ apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+ assertEquals("getToIndex", 1, apply.getToIndex());
+ verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
+ nonVotingServer("downNode2"));
+ assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
+ assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
+
+ LOG.info("testChangeToVotingWithNoLeader ending");
+ }
+
+ @Test
+ public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
+ LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
+
+ final String node1ID = "node1";
+ final String node2ID = "node2";
+
+ PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
+ @Override
+ public String resolve(String peerId) {
+ return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+ peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+ }
+ };
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
+ ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+ InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+
+ DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
+ configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams1.setElectionTimeoutFactor(1);
+ configParams1.setPeerAddressResolver(peerAddressResolver);
+ TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
+ PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+ CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+ DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
+ configParams2.setElectionTimeoutFactor(1000000);
+ configParams2.setPeerAddressResolver(peerAddressResolver);
+ TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
+ PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+ CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+ // Wait for snapshot after recovery
+ MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+ // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
+ // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
+ // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
+ // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
+ // node2 was previously voting.
+
+ node2RaftActor.setDropMessageOfType(RequestVote.class);
+
+ ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
+ node1RaftActorRef.tell(changeServers, testKit.getRef());
+ ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
+
+ assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
+ Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
+ assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+ LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
+ }
+
+ @Test
+ public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
+ LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
+
+ final String node1ID = "node1";
+ final String node2ID = "node2";
+
+ PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
+ @Override
+ public String resolve(String peerId) {
+ return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+ peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+ }
+ };
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(3);
+ configParams.setPeerAddressResolver(peerAddressResolver);
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
+ ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+ InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
+ new MockRaftActorContext.MockPayload("2")));
+
+ TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+ CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+ TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+ CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+ // Wait for snapshot after recovery
+ MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+ // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
+ // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
+ // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
+ // forward the request to node2.
+
+ ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
+ ImmutableMap.of(node1ID, true, node2ID, true));
+ node1RaftActorRef.tell(changeServers, testKit.getRef());
+ ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
+
+ MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+ verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID));
+ assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
+
+ MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+ verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID));
+ assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
+ assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+ LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
+ }
+
+ @Test
+ public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
+ LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+
+ final String node1ID = "node1";
+ final String node2ID = "node2";
+
+ configParams.setPeerAddressResolver(new PeerAddressResolver() {
+ @Override
+ public String resolve(String peerId) {
+ return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+ peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+ }
+ });
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
+ ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+ InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+
+ TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+ CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+ TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+ CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+ // Wait for snapshot after recovery
+ MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+ // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
+ // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
+ // RequestVote messages in node2 and make it the leader so node1 should forward the server change
+ // request to node2 when node2 is elected.
+
+ node2RaftActor.setDropMessageOfType(RequestVote.class);
+
+ ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
+ node2ID, true));
+ node1RaftActorRef.tell(changeServers, testKit.getRef());
+
+ MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
+
+ node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+
+ ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
+
+ MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+ verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID));
+ assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
+ assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+ MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+ verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID));
+ assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
+
+ LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
}
private void verifyRaftState(RaftState expState, RaftActor... raftActors) {
ReplicatedLogEntry logEntry = log.get(log.lastIndex());
assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
- assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
+ assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
}
private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
- ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
+ NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
+ ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
termInfo.update(1, LEADER_ID);
return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
+ id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
}
static abstract class AbstractMockRaftActor extends MockRaftActor {
private volatile Class<?> dropMessageOfType;
AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
- DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
+ boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
- dataPersistenceProvider(dataPersistenceProvider));
+ persistent(Optional.of(persistent)));
this.collectorActor = collectorActor;
}
public static class CollectingMockRaftActor extends AbstractMockRaftActor {
- CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
- super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
+ CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+ boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
+ super(id, peerAddresses, config, persistent, collectorActor);
+ snapshotCohortDelegate = new RaftActorSnapshotCohort() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
+ actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+ }
+
+ @Override
+ public void applySnapshot(byte[] snapshotBytes) {
+ }
+ };
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
+ ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
- return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
+ return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
+ persistent, collectorActor);
}
}
public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
- super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
+ super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
setPersistence(false);
}