Enforce non-null entries field in AppendEntries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
index 86caf2a10e6650dd45d9953458127b43adda0620..37a8b6b9065b7177368d4e345d3dc3268ad8d59e 100644 (file)
@@ -12,32 +12,50 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActor;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -45,10 +63,21 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.duration.FiniteDuration;
 
-public class FollowerTest extends AbstractRaftActorBehaviorTest {
+public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
@@ -56,14 +85,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
 
-    private RaftActorBehavior follower;
+    private Follower follower;
 
     private final short payloadVersion = 5;
 
     @Override
     @After
     public void tearDown() throws Exception {
-        if(follower != null) {
+        if (follower != null) {
             follower.close();
         }
 
@@ -71,8 +100,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
-        return new TestFollower(actorContext);
+    protected Follower createBehavior(RaftActorContext actorContext) {
+        return spy(new Follower(actorContext));
     }
 
     @Override
@@ -81,44 +110,71 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected  MockRaftActorContext createActorContext(ActorRef actorRef){
+    protected  MockRaftActorContext createActorContext(ActorRef actorRef) {
         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
-        context.setPayloadVersion(payloadVersion );
+        context.setPayloadVersion(payloadVersion);
         return context;
     }
 
-    private static int getElectionTimeoutCount(RaftActorBehavior follower){
-        if(follower instanceof TestFollower){
-            return ((TestFollower) follower).getElectionTimeoutCount();
-        }
-        return -1;
-    }
-
     @Test
-    public void testThatAnElectionTimeoutIsTriggered(){
+    public void testThatAnElectionTimeoutIsTriggered() {
         MockRaftActorContext actorContext = createActorContext();
         follower = new Follower(actorContext);
 
-        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
     }
 
     @Test
-    public void testHandleElectionTimeout(){
-        logStart("testHandleElectionTimeout");
+    public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
+        logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
 
-        follower = new Follower(createActorContext());
+        MockRaftActorContext context = createActorContext();
+        follower = new Follower(context);
 
-        RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE);
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
 
         assertTrue(raftBehavior instanceof Candidate);
     }
 
     @Test
-    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+    public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
+        logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
+
+        MockRaftActorContext context = createActorContext();
+        ((DefaultConfigParamsImpl) context.getConfigParams())
+                .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
+
+        follower = new Follower(context);
+        context.setCurrentBehavior(follower);
+
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
+                .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+                -1, -1, (short) 1));
+
+        Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
+        RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+        assertTrue(raftBehavior instanceof Follower);
+
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
+                .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+                -1, -1, (short) 1));
+
+        Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+        assertTrue(raftBehavior instanceof Follower);
+    }
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
 
-        RaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
         long term = 1000;
         context.getTermInformation().update(term, null);
 
@@ -130,14 +186,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         assertEquals("isVoteGranted", true, reply.isVoteGranted());
         assertEquals("getTerm", term, reply.getTerm());
-        assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
+        verify(follower).scheduleElection(any(FiniteDuration.class));
     }
 
     @Test
-    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
 
-        RaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
         long term = 1000;
         context.getTermInformation().update(term, "test");
 
@@ -148,7 +204,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
 
         assertEquals("isVoteGranted", false, reply.isVoteGranted());
-        assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
+        verify(follower, never()).scheduleElection(any(FiniteDuration.class));
     }
 
 
@@ -172,7 +228,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
-        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
@@ -194,7 +251,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
-        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
@@ -202,7 +260,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
+            throws Exception {
         logStart("testHandleFirstAppendEntries");
 
         MockRaftActorContext context = createActorContext();
@@ -219,7 +278,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
-        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
@@ -227,7 +287,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
+            throws Exception {
         logStart("testHandleFirstAppendEntries");
 
         MockRaftActorContext context = createActorContext();
@@ -243,7 +304,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
-        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
@@ -251,8 +313,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
-        logStart("testHandleFirstAppendEntries");
+    public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
+            throws Exception {
+        logStart(
+               "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
 
         MockRaftActorContext context = createActorContext();
         context.getReplicatedLog().clear(0,2);
@@ -267,7 +331,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
-        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
@@ -289,7 +354,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
-        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
 
@@ -339,7 +405,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
-        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
 
@@ -380,7 +447,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
-        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
 
@@ -430,8 +498,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
      * with a commitIndex that is greater than what has been applied to the
      * state machine of the RaftActor, the RaftActor applies the state and
      * sets it current applied state to the commitIndex of the sender.
-     *
-     * @throws Exception
      */
     @Test
     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
@@ -460,8 +526,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
      * This test verifies that when an AppendEntries is received a specific prevLogTerm
      * which does not match the term that is in RaftActors log entry at prevLogIndex
      * then the RaftActor does not change it's state and it returns a failure.
-     *
-     * @throws Exception
      */
     @Test
     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
@@ -474,7 +538,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         // AppendEntries is now sent with a bigger term
         // this will set the receivers term to be the same as the sender's term
-        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
+        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
+                (short)0);
 
         follower = createBehavior(context);
 
@@ -492,9 +557,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
      * This test verifies that when a new AppendEntries message is received with
      * new entries and the logs of the sender and receiver match that the new
      * entries get added to the log and the log is incremented by the number of
-     * entries received in appendEntries
-     *
-     * @throws Exception
+     * entries received in appendEntries.
      */
     @Test
     public void testHandleAppendEntriesAddNewEntries() {
@@ -546,7 +609,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
      * This test verifies that when a new AppendEntries message is received with
      * new entries and the logs of the sender and receiver are out-of-sync that
      * the log is first corrected by removing the out of sync entries from the
-     * log and then adding in the new entries sent with the AppendEntries message
+     * log and then adding in the new entries sent with the AppendEntries message.
      */
     @Test
     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
@@ -638,10 +701,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleAppendEntriesPreviousLogEntryMissing(){
+    public void testHandleAppendEntriesPreviousLogEntryMissing() {
         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
 
-        MockRaftActorContext context = createActorContext();
+        final MockRaftActorContext context = createActorContext();
 
         // Prepare the receivers log
         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
@@ -708,7 +771,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleAppendEntriesAfterInstallingSnapshot(){
+    public void testHandleAppendEntriesAfterInstallingSnapshot() {
         logStart("testHandleAppendAfterInstallingSnapshot");
 
         MockRaftActorContext context = createActorContext();
@@ -741,8 +804,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     /**
      * This test verifies that when InstallSnapshot is received by
      * the follower its applied correctly.
-     *
-     * @throws Exception
      */
     @Test
     public void testHandleInstallSnapshot() throws Exception {
@@ -753,16 +814,16 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         follower = createBehavior(context);
 
-        ByteString bsSnapshot  = createSnapshot();
+        ByteString bsSnapshot = createSnapshot();
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
-        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
         int lastIncludedIndex = 1;
         int chunkIndex = 1;
         InstallSnapshot lastInstallSnapshot = null;
 
-        for(int i = 0; i < totalChunks; i++) {
+        for (int i = 0; i < totalChunks; i++) {
             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
                     chunkData, chunkIndex, totalChunks);
@@ -782,7 +843,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
                 snapshot.getLastAppliedIndex());
         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
-        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+        assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
+        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
         applySnapshot.getCallback().onSuccess();
@@ -792,22 +854,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
 
         chunkIndex = 1;
-        for(InstallSnapshotReply reply: replies) {
+        for (InstallSnapshotReply reply: replies) {
             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
             assertEquals("getTerm", 1, reply.getTerm());
             assertEquals("isSuccess", true, reply.isSuccess());
             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
         }
 
-        assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+        assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
     }
 
 
     /**
      * Verify that when an AppendEntries is sent to a follower during a snapshot install
      * the Follower short-circuits the processing of the AppendEntries message.
-     *
-     * @throws Exception
      */
     @Test
     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
@@ -820,11 +880,11 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         ByteString bsSnapshot  = createSnapshot();
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
-        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
         int lastIncludedIndex = 1;
 
         // Check that snapshot installation is not in progress
-        assertNull(((Follower) follower).getSnapshotTracker());
+        assertNull(follower.getSnapshotTracker());
 
         // Make sure that we have more than 1 chunk to send
         assertTrue(totalChunks > 1);
@@ -835,22 +895,64 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 chunkData, 1, totalChunks));
 
         // Check if snapshot installation is in progress now
-        assertNotNull(((Follower) follower).getSnapshotTracker());
+        assertNotNull(follower.getSnapshotTracker());
 
         // Send an append entry
-        AppendEntries appendEntries = mock(AppendEntries.class);
-        doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
+                Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
 
         follower.handleMessage(leaderActor, appendEntries);
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
-        assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
-        assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
-        assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+        assertEquals("isSuccess", true, reply.isSuccess());
+        assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+        assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+        assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+        assertNotNull(follower.getSnapshotTracker());
+    }
+
+    @Test
+    public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
+        logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+
+        // Check that snapshot installation is not in progress
+        assertNull(follower.getSnapshotTracker());
+
+        // Make sure that we have more than 1 chunk to send
+        assertTrue(totalChunks > 1);
+
+        // Send an install snapshot with the first chunk to start the process of installing a snapshot
+        byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+        follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                chunkData, 1, totalChunks));
+
+        // Check if snapshot installation is in progress now
+        assertNotNull(follower.getSnapshotTracker());
 
-        // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
-        verify(appendEntries, never()).getPrevLogIndex();
+        // Send appendEntries with a new term and leader.
+        AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
+                Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
 
+        follower.handleMessage(leaderActor, appendEntries);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+        assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
+        assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
+        assertEquals("getTerm", 2, reply.getTerm());
+
+        assertNull(follower.getSnapshotTracker());
     }
 
     @Test
@@ -858,6 +960,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         logStart("testInitialSyncUpWithHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
+        context.setCommitIndex(-1);
 
         follower = createBehavior(context);
 
@@ -865,12 +968,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
-        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
         int lastIncludedIndex = 1;
         int chunkIndex = 1;
         InstallSnapshot lastInstallSnapshot = null;
 
-        for(int i = 0; i < totalChunks; i++) {
+        for (int i = 0; i < totalChunks; i++) {
             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
                     chunkData, chunkIndex, totalChunks);
@@ -927,28 +1030,31 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getTerm", 1, reply.getTerm());
         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
 
-        assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+        assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
     }
 
     @Test
-    public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+    public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
         MockRaftActorContext context = createActorContext();
 
         Stopwatch stopwatch = Stopwatch.createStarted();
 
         follower = createBehavior(context);
 
-        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+        TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
 
         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
 
         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+        assertTrue("Expected Candidate", newBehavior instanceof Candidate);
     }
 
     @Test
-    public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
+    public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
         MockRaftActorContext context = createActorContext();
-        context.setConfigParams(new DefaultConfigParamsImpl(){
+        context.setConfigParams(new DefaultConfigParamsImpl() {
             @Override
             public FiniteDuration getElectionTimeOutInterval() {
                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
@@ -959,11 +1065,30 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         follower = createBehavior(context);
 
-        MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
+        TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+        assertSame("handleMessage result", follower, newBehavior);
     }
 
     @Test
-    public void testElectionScheduledWhenAnyRaftRPCReceived(){
+    public void testFollowerSchedulesElectionIfNonVoting() {
+        MockRaftActorContext context = createActorContext();
+        context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
+                FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
+
+        follower = new Follower(context, "leader", (short)1);
+
+        ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
+                ElectionTimeout.class);
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
+        assertSame("handleMessage result", follower, newBehavior);
+        assertNull("Expected null leaderId", follower.getLeaderId());
+    }
+
+    @Test
+    public void testElectionScheduledWhenAnyRaftRPCReceived() {
         MockRaftActorContext context = createActorContext();
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, new RaftRPC() {
@@ -974,25 +1099,238 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 return 100;
             }
         });
-        assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
+        verify(follower).scheduleElection(any(FiniteDuration.class));
     }
 
     @Test
-    public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
+    public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
         MockRaftActorContext context = createActorContext();
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, "non-raft-rpc");
-        assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
+        verify(follower, never()).scheduleElection(any(FiniteDuration.class));
     }
 
-    public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
+    @Test
+    public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception {
+        String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(2);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+        InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        InMemoryJournal.waitForWriteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 1 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
+                MockRaftActor.fromState(snapshot.getState()));
+    }
+
+    @Test
+    public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception {
+        String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(2);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+        InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+                newReplicatedLogEntry(1, 2, "three"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        InMemoryJournal.waitForWriteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 2 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+                entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
+
+        assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+        assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
+
+        // Reinstate the actor from persistence
+
+        actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
+
+        followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+        assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
+        assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
+        assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
+        assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+                entries.get(2).getData()), followerRaftActor.get().getState());
+    }
+
+    @Test
+    public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception {
+        String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(1);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+        InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+                newReplicatedLogEntry(1, 2, "three"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        InMemoryJournal.waitForWriteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 0 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
+        assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
+                MockRaftActor.fromState(snapshot.getState()));
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
+        RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+            @Override
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                try {
+                    actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
+                            installSnapshotStream), actorRef);
+                } catch (Exception e) {
+                    Throwables.propagate(e);
+                }
+            }
+
+            @Override
+            public void applySnapshot(State snapshotState) {
+            }
+
+            @Override
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
+            }
+        };
+        return snapshotCohort;
+    }
+
+    public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
         int snapshotLength = bs.size();
         int start = offset;
         int size = chunkSize;
         if (chunkSize > snapshotLength) {
             size = snapshotLength;
         } else {
-            if ((start + chunkSize) > snapshotLength) {
+            if (start + chunkSize > snapshotLength) {
                 size = snapshotLength - start;
             }
         }
@@ -1025,11 +1363,11 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
 
     private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
-        return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
+        return new SimpleReplicatedLogEntry(index, term,
                 new MockRaftActorContext.MockPayload(data));
     }
 
-    private ByteString createSnapshot(){
+    private ByteString createSnapshot() {
         HashMap<String, String> followerSnapshot = new HashMap<>();
         followerSnapshot.put("1", "A");
         followerSnapshot.put("2", "B");
@@ -1039,7 +1377,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
 
@@ -1048,28 +1386,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
+    protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
             throws Exception {
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
         assertEquals("isSuccess", true, reply.isSuccess());
     }
-
-    private static class TestFollower extends Follower {
-
-        int electionTimeoutCount = 0;
-
-        public TestFollower(RaftActorContext context) {
-            super(context);
-        }
-
-        @Override
-        protected void scheduleElection(FiniteDuration interval) {
-            electionTimeoutCount++;
-            super.scheduleElection(interval);
-        }
-
-        public int getElectionTimeoutCount() {
-            return electionTimeoutCount;
-        }
-    }
 }