+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.duration.FiniteDuration;
Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
private Leader leader;
+ private final short payloadVersion = 5;
@Override
@After
logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
MockRaftActorContext actorContext = createActorContextWithFollower();
+ short payloadVersion = (short)5;
+ actorContext.setPayloadVersion(payloadVersion);
long term = 1;
actorContext.getTermInformation().update(term, "");
assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
// The follower would normally reply - simulate that explicitly here.
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex - 1, term));
+ FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
assertEquals("Entries size", 1, appendEntries.getEntries().size());
assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
}
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
- MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
- MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
- 1, lastIndex + 1, payload);
- actorContext.getReplicatedLog().append(newEntry);
- RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
// State should not change
assertTrue(raftBehavior instanceof Leader);
assertEquals("Entries size", 1, appendEntries.getEntries().size());
assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
- assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
+ }
+
+ @Test
+ public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+ logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setRaftPolicy(createRaftPolicy(true, true));
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
+
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
}
@Test
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
for(int i=0;i<3;i++) {
sendReplicate(actorContext, lastIndex+i+1);
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+ FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
}
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int newEntryIndex = 4;
final int snapshotTerm = 1;
final int currentTerm = 2;
// set the snapshot variables in replicatedlog
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
//set follower timeout to 2 mins, helps during debugging
actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
// new entry
ReplicatedLogImplEntry entry =
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
leader.markFollowerActive(FOLLOWER_ID);
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
- assertEquals(snapshotIndex, is.getLastIncludedIndex());
+ assertEquals(commitIndex, is.getLastIncludedIndex());
}
@Test
assertTrue(raftBehavior instanceof Leader);
- MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
}
@Test
MockRaftActorContext actorContext = createActorContextWithFollower();
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
-
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(Optional.<ByteString>absent());
+ leader.setSnapshot(null);
// new entry
ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+ CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
- assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
+ Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
+ @Test
+ public void testInitiateForceInstallSnapshot() throws Exception {
+ logStart("testInitiateForceInstallSnapshot");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = -1;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = -1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setLastApplied(3);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // set the snapshot as absent and check if capture-snapshot is invoked.
+ leader.setSnapshot(null);
+
+ for(int i=0;i<4;i++) {
+ actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+ new MockRaftActorContext.MockPayload("X" + i)));
+ }
+
+ // new entry
+ ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ actorContext.getReplicatedLog().append(entry);
+
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
+
+ // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
+ // installed with a SendInstallSnapshot
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
+
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+ CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
+
+ assertTrue(cs.isInstallSnapshotInitiated());
+ assertEquals(3, cs.getLastAppliedIndex());
+ assertEquals(1, cs.getLastAppliedTerm());
+ assertEquals(4, cs.getLastIndex());
+ assertEquals(2, cs.getLastTerm());
+
+ // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+ leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+
+ Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ }
+
+
@Test
public void testInstallSnapshot() throws Exception {
logStart("testInstallSnapshot");
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int lastAppliedIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(lastAppliedIndex);
+ actorContext.setLastApplied(lastAppliedIndex);
leader = new Leader(actorContext);
- // Ignore initial heartbeat.
+ // Initial heartbeat.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new SendInstallSnapshot(toByteString(leadersSnapshot)));
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
+ Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
assertTrue(raftBehavior instanceof Leader);
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertNotNull(installSnapshot.getData());
- assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+ assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
+ assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
+
+ assertEquals(currentTerm, installSnapshot.getTerm());
+ }
+
+ @Test
+ public void testForceInstallSnapshot() throws Exception {
+ logStart("testForceInstallSnapshot");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ final int lastAppliedIndex = 3;
+ final int snapshotIndex = -1;
+ final int snapshotTerm = -1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ actorContext.setCommitIndex(lastAppliedIndex);
+ actorContext.setLastApplied(lastAppliedIndex);
+
+ leader = new Leader(actorContext);
+
+ // Initial heartbeat.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
+
+ Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ // check if installsnapshot gets called with the correct values.
+
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertNotNull(installSnapshot.getData());
+ assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
assertEquals(currentTerm, installSnapshot.getTerm());
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
// Ignore initial heartbeat.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
assertEquals(1, leader.followerLogSize());
FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
assertNotNull(fli);
- assertEquals(snapshotIndex, fli.getMatchIndex());
- assertEquals(snapshotIndex, fli.getMatchIndex());
- assertEquals(snapshotIndex + 1, fli.getNextIndex());
+ assertEquals(commitIndex, fli.getMatchIndex());
+ assertEquals(commitIndex + 1, fli.getNextIndex());
}
@Test
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
actorContext.setConfigParams(configParams);
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
}
});
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
}
});
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
configParams.setElectionTimeoutFactor(100000);
MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
context.setConfigParams(configParams);
+ context.setPayloadVersion(payloadVersion);
return context;
}
+ private MockRaftActorContext createFollowerActorContextWithLeader() {
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
+ followerConfig.setElectionTimeoutFactor(10000);
+ followerActorContext.setConfigParams(followerConfig);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+ return followerActorContext;
+ }
+
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
}
@Test
- public void testHandleAppendEntriesReplyFailure(){
- logStart("testHandleAppendEntriesReplyFailure");
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ long leaderCommitIndex = 2;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(0);
+ followerActorContext.setLastApplied(0);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
leader = new Leader(leaderActorContext);
- // Send initial heartbeat reply with last index.
- leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersThirdLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
- assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
+ assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
- AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
+ }
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyWithNewerTerm(){
+ logStart("testHandleAppendEntriesReplyWithNewerTerm");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(10000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+
+ leader = new Leader(leaderActorContext);
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(false, appendEntriesReply.isSuccess());
+ assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+ MessageCollectorActor.clearMessages(leaderActor);
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
+ logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(10000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
+
+ leader = new Leader(leaderActorContext);
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(false, appendEntriesReply.isSuccess());
+ assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+ MessageCollectorActor.clearMessages(leaderActor);
}
@Test
leader = new Leader(leaderActorContext);
- AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
+ assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+
+ short payloadVersion = 5;
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
ApplyState applyState = applyStateList.get(0);
assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals(payloadVersion, followerInfo.getPayloadVersion());
}
@Test
leader = new Leader(leaderActorContext);
- AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
+ AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
}
@Test
- public void testHandleRequestVoteReply(){
- logStart("testHandleRequestVoteReply");
+ public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
+ logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
- MockRaftActorContext leaderActorContext = createActorContext();
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
- leader = new Leader(leaderActorContext);
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
+ long leaderCommitIndex = 3;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
- // Should be a no-op.
- RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
- new RequestVoteReply(1, true));
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+ ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
- raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
- assertEquals(RaftState.Leader, raftActorBehavior.state());
- }
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
- @Test
- public void testIsolatedLeaderCheckNoFollowers() {
- logStart("testIsolatedLeaderCheckNoFollowers");
+ leader = new Leader(leaderActorContext);
- MockRaftActorContext leaderActorContext = createActorContext();
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
- leader = new Leader(leaderActorContext);
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue(behavior instanceof Leader);
- }
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
- @Test
- public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
- logStart("testIsolatedLeaderCheckTwoFollowers");
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
- new JavaTestKit(getSystem()) {{
+ leaderActor.underlyingActor().setBehavior(leader);
- ActorRef followerActor1 = getTestActor();
- ActorRef followerActor2 = getTestActor();
+ leader.handleMessage(followerActor, appendEntriesReply);
- MockRaftActorContext leaderActorContext = createActorContext();
+ List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower-1", followerActor1.path().toString());
- peerAddresses.put("follower-2", followerActor2.path().toString());
+ appendEntries = appendEntriesList.get(0);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
- leaderActorContext.setPeerAddresses(peerAddresses);
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
- leader = new Leader(leaderActorContext);
+ appendEntries = appendEntriesList.get(1);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
- leader.markFollowerActive("follower-1");
- leader.markFollowerActive("follower-2");
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue("Behavior not instance of Leader when all followers are active",
- behavior instanceof Leader);
+ assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersThirdLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersFourthLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
- // kill 1 follower and verify if that got killed
- final JavaTestKit probe = new JavaTestKit(getSystem());
- probe.watch(followerActor1);
- followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
- final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
- assertEquals(termMsg1.getActor(), followerActor1);
-
- leader.markFollowerInActive("follower-1");
- leader.markFollowerActive("follower-2");
- behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
- behavior instanceof Leader);
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
- // kill 2nd follower and leader should change to Isolated leader
- followerActor2.tell(PoisonPill.getInstance(), null);
- probe.watch(followerActor2);
- followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
- final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
- assertEquals(termMsg2.getActor(), followerActor2);
-
- leader.markFollowerInActive("follower-2");
- behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
- behavior instanceof IsolatedLeader);
- }};
- }
+ MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
+ assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
+ }
@Test
- public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
- logStart("testAppendEntryCallAtEndofAppendEntryReply");
+ public void testHandleRequestVoteReply(){
+ logStart("testHandleRequestVoteReply");
- MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ MockRaftActorContext leaderActorContext = createActorContext();
- DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
- configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+ leader = new Leader(leaderActorContext);
- leaderActorContext.setConfigParams(configParams);
+ // Should be a no-op.
+ RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
+ new RequestVoteReply(1, true));
- MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
- followerActorContext.setConfigParams(configParams);
- followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+ raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
- Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().setBehavior(follower);
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }
- leaderActorContext.getReplicatedLog().removeFrom(0);
- leaderActorContext.setCommitIndex(-1);
- leaderActorContext.setLastApplied(-1);
+ @Test
+ public void testIsolatedLeaderCheckNoFollowers() {
+ logStart("testIsolatedLeaderCheckNoFollowers");
- followerActorContext.getReplicatedLog().removeFrom(0);
- followerActorContext.setCommitIndex(-1);
- followerActorContext.setLastApplied(-1);
+ MockRaftActorContext leaderActorContext = createActorContext();
leader = new Leader(leaderActorContext);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue(behavior instanceof Leader);
+ }
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
- leaderActor, AppendEntriesReply.class);
-
- leader.handleMessage(followerActor, appendEntriesReply);
+ private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
+ ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
+ ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
- // Clear initial heartbeat messages
+ MockRaftActorContext leaderActorContext = createActorContext();
- leaderActor.underlyingActor().clear();
- followerActor.underlyingActor().clear();
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+ peerAddresses.put("follower-2", followerActor2.path().toString());
- // create 3 entries
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
- leaderActorContext.setCommitIndex(1);
- leaderActorContext.setLastApplied(1);
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setRaftPolicy(raftPolicy);
- Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
+ leader = new Leader(leaderActorContext);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.markFollowerActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+ behavior instanceof Leader);
- AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ // kill 1 follower and verify if that got killed
+ final JavaTestKit probe = new JavaTestKit(getSystem());
+ probe.watch(followerActor1);
+ followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg1.getActor(), followerActor1);
+
+ leader.markFollowerInActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+ behavior instanceof Leader);
- // Should send first log entry
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(0, appendEntries.getEntries().get(0).getIndex());
- assertEquals(-1, appendEntries.getPrevLogIndex());
+ // kill 2nd follower and leader should change to Isolated leader
+ followerActor2.tell(PoisonPill.getInstance(), null);
+ probe.watch(followerActor2);
+ followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg2.getActor(), followerActor2);
- appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ leader.markFollowerInActive("follower-2");
+ return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ }
- assertEquals(1, appendEntriesReply.getLogLastTerm());
- assertEquals(0, appendEntriesReply.getLogLastIndex());
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ logStart("testIsolatedLeaderCheckTwoFollowers");
- followerActor.underlyingActor().clear();
+ RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
- leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+ Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ behavior instanceof IsolatedLeader);
+ }
- appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
+ logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
- // Should send second log entry
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
- follower.close();
+ Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
+ behavior instanceof Leader);
}
@Test
for(int i=1;i<6;i++) {
// Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
- RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
assertTrue(newBehavior == leader);
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
}
}};
}
+ @Test
+ public void testReplicationConsensusWithNonVotingFollower() {
+ logStart("testReplicationConsensusWithNonVotingFollower");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ String nonVotingFollowerId = "nonvoting-follower";
+ TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+
+ leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+
+ leader = new Leader(leaderActorContext);
+
+ // Ignore initial heartbeats
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send a Replicate message and wait for AppendEntries.
+ sendReplicate(leaderActorContext, 0);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ // Send reply only from the voting follower and verify consensus via ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send another Replicate message
+ sendReplicate(leaderActorContext, 1);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
+ AppendEntries.class);
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+
+ // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
+
+ // Send reply from the voting follower and verify consensus.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInSync() {
+ logStart("testTransferLeadershipWithFollowerInSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithEmptyLog() {
+ logStart("testTransferLeadershipWithEmptyLog");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+ logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Sync up the follower.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerSyncTimeout() {
+ logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Send heartbeats to time out the transfer.
+ for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ }
+
+ verify(mockTransferCohort).abortTransfer();
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {