Add raftVersion field to AppendEntriesReply
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 9b877a7cdc2b0b3d804a9a31697e7a1f044f7f3e..8529e1926b31ebb88548653594ec867af202b6ea 100644 (file)
@@ -11,6 +11,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -32,14 +35,18 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -139,9 +146,13 @@ public class LeaderTest extends AbstractLeaderTest {
 
 
     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+        return sendReplicate(actorContext, 1, index);
+    }
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                1, index, payload);
+                term, index, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
     }
@@ -183,6 +194,58 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
     }
 
+    @Test
+    public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+        logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        // The raft context is initialized with a couple log entries. However the commitIndex
+        // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
+        // committed and applied. Now it regains leadership with a higher term (2).
+        long prevTerm = actorContext.getTermInformation().getCurrentTerm();
+        long newTerm = prevTerm + 1;
+        actorContext.getTermInformation().update(newTerm, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower replies with the leader's current last index and term, simulating that it is
+        // up to date with the leader.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
+
+        // The commit index should not get updated even though consensus was reached. This is b/c the
+        // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
+        // from previous terms by counting replicas".
+        assertEquals("Commit Index", -1, actorContext.getCommitIndex());
+
+        followerActor.underlyingActor().clear();
+
+        // Now replicate a new entry with the new term 2.
+        long newIndex = lastIndex + 1;
+        sendReplicate(actorContext, newTerm, newIndex);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+
+        // The follower replies with success. The leader should now update the commit index to the new index
+        // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
+        // prior entries are committed indirectly".
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
+
+        assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
+    }
+
     @Test
     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
@@ -601,11 +664,6 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        Map<String, String> leadersSnapshot = new HashMap<>();
-        leadersSnapshot.put("1", "A");
-        leadersSnapshot.put("2", "B");
-        leadersSnapshot.put("3", "C");
-
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
@@ -656,6 +714,69 @@ public class LeaderTest extends AbstractLeaderTest {
         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
+    @Test
+    public void testInitiateForceInstallSnapshot() throws Exception {
+        logStart("testInitiateForceInstallSnapshot");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        final int followersLastIndex = 2;
+        final int snapshotIndex = -1;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = -1;
+        final int currentTerm = 2;
+
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.setLastApplied(3);
+        actorContext.setCommitIndex(followersLastIndex);
+
+        actorContext.getReplicatedLog().removeFrom(0);
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // set the snapshot as absent and check if capture-snapshot is invoked.
+        leader.setSnapshot(null);
+
+        for(int i=0;i<4;i++) {
+            actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+                    new MockRaftActorContext.MockPayload("X" + i)));
+        }
+
+        // new entry
+        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                new MockRaftActorContext.MockPayload("D"));
+
+        actorContext.getReplicatedLog().append(entry);
+
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
+
+        // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
+        // installed with a SendInstallSnapshot
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
+
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+        CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
+
+        assertTrue(cs.isInstallSnapshotInitiated());
+        assertEquals(3, cs.getLastAppliedIndex());
+        assertEquals(1, cs.getLastAppliedTerm());
+        assertEquals(4, cs.getLastIndex());
+        assertEquals(2, cs.getLastTerm());
+
+        // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+
+        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+    }
+
+
     @Test
     public void testInstallSnapshot() throws Exception {
         logStart("testInstallSnapshot");
@@ -709,6 +830,56 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(currentTerm, installSnapshot.getTerm());
     }
 
+    @Test
+    public void testForceInstallSnapshot() throws Exception {
+        logStart("testForceInstallSnapshot");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        final int lastAppliedIndex = 3;
+        final int snapshotIndex = -1;
+        final int snapshotTerm = -1;
+        final int currentTerm = 2;
+
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        actorContext.setCommitIndex(lastAppliedIndex);
+        actorContext.setLastApplied(lastAppliedIndex);
+
+        leader = new Leader(actorContext);
+
+        // Initial heartbeat.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
+
+        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+
+        assertTrue(raftBehavior instanceof Leader);
+
+        // check if installsnapshot gets called with the correct values.
+
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+        assertNotNull(installSnapshot.getData());
+        assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
+        assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
+
+        assertEquals(currentTerm, installSnapshot.getTerm());
+    }
+
     @Test
     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
         logStart("testHandleInstallSnapshotReplyLastChunk");
@@ -1488,7 +1659,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+        assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
 
         short payloadVersion = 5;
         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
@@ -1515,8 +1689,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
 
-        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals(2, followerInfo.getMatchIndex());
+        assertEquals(3, followerInfo.getNextIndex());
         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+        assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
     }
 
     @Test
@@ -1541,7 +1717,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
-        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
 
         leaderActorContext.setReplicatedLog(
                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
@@ -1771,6 +1947,215 @@ public class LeaderTest extends AbstractLeaderTest {
         }};
     }
 
+    @Test
+    public void testReplicationConsensusWithNonVotingFollower() {
+        logStart("testReplicationConsensusWithNonVotingFollower");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        String nonVotingFollowerId = "nonvoting-follower";
+        TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
+                Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+
+        leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+
+        leader = new Leader(leaderActorContext);
+
+        // Ignore initial heartbeats
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Send a Replicate message and wait for AppendEntries.
+        sendReplicate(leaderActorContext, 0);
+
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+        // Send reply only from the voting follower and verify consensus via ApplyState.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Send another Replicate message
+        sendReplicate(leaderActorContext, 1);
+
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
+                AppendEntries.class);
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+
+        // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
+
+        MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
+
+        // Send reply from the voting follower and verify consensus.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+    }
+
+    @Test
+    public void testTransferLeadershipWithFollowerInSync() {
+        logStart("testTransferLeadershipWithFollowerInSync");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, 0);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+        // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+        MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithEmptyLog() {
+        logStart("testTransferLeadershipWithEmptyLog");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+
+        // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+        logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+
+        // Sync up the follower.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithFollowerSyncTimeout() {
+        logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(200, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, 0);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+
+        // Send heartbeats to time out the transfer.
+        for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                    getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+        }
+
+        verify(mockTransferCohort).abortTransfer();
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {