sal-akka-raft: use lambdas
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 6964db51f273f52ac0591b80cf75edf5997ec68c..a8ecf71c47943c023cdbd64f0a63462b4084ef09 100644 (file)
@@ -1,8 +1,24 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -11,44 +27,58 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.opendaylight.yangtools.concepts.Identifier;
 import scala.concurrent.duration.FiniteDuration;
 
-public class LeaderTest extends AbstractLeaderTest {
+public class LeaderTest extends AbstractLeaderTest<Leader> {
 
     static final String FOLLOWER_ID = "follower";
+    public static final String LEADER_ID = "leader";
 
     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
@@ -57,11 +87,12 @@ public class LeaderTest extends AbstractLeaderTest {
             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
 
     private Leader leader;
+    private final short payloadVersion = 5;
 
     @Override
     @After
     public void tearDown() throws Exception {
-        if(leader != null) {
+        if (leader != null) {
             leader.close();
         }
 
@@ -74,10 +105,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(createActorContext());
 
-        // handle message should return the Leader state when it receives an
-        // unknown message
-        RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
-        Assert.assertTrue(behavior instanceof Leader);
+        // handle message should null when it receives an unknown message
+        assertNull(leader.handleMessage(followerActor, "foo"));
     }
 
     @Test
@@ -85,32 +114,36 @@ public class LeaderTest extends AbstractLeaderTest {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
+        actorContext.setPayloadVersion(payloadVersion);
 
         long term = 1;
         actorContext.getTermInformation().update(term, "");
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         // Leader should send an immediate heartbeat with no entries as follower is inactive.
-        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        final long lastIndex = actorContext.getReplicatedLog().lastIndex();
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         assertEquals("getTerm", term, appendEntries.getTerm());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
         assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
 
         // The follower would normally reply - simulate that explicitly here.
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex - 1, term));
+                FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
 
         // Sleep for the heartbeat interval so AppendEntries is sent.
-        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
-                getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
@@ -118,6 +151,19 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
+    }
+
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
+        return sendReplicate(actorContext, 1, index);
+    }
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
+        actorContext.getReplicatedLog().append(newEntry);
+        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
     }
 
     @Test
@@ -137,17 +183,105 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
 
-        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                1, lastIndex + 1, payload);
-        actorContext.getReplicatedLog().append(newEntry);
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(null, null, newEntry));
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
+
+        // State should not change
+        assertTrue(raftBehavior instanceof Leader);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+        assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
+    }
+
+    @Test
+    public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+        logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
+        actorContext.setLastApplied(-1);
+
+        // The raft context is initialized with a couple log entries. However the commitIndex
+        // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
+        // committed and applied. Now it regains leadership with a higher term (2).
+        long prevTerm = actorContext.getTermInformation().getCurrentTerm();
+        long newTerm = prevTerm + 1;
+        actorContext.getTermInformation().update(newTerm, "");
+
+        leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower replies with the leader's current last index and term, simulating that it is
+        // up to date with the leader.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
+
+        // The commit index should not get updated even though consensus was reached. This is b/c the
+        // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
+        // from previous terms by counting replicas".
+        assertEquals("Commit Index", -1, actorContext.getCommitIndex());
+
+        followerActor.underlyingActor().clear();
+
+        // Now replicate a new entry with the new term 2.
+        long newIndex = lastIndex + 1;
+        sendReplicate(actorContext, newTerm, newIndex);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+
+        // The follower replies with success. The leader should now update the commit index to the new index
+        // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
+        // prior entries are committed indirectly".
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
+
+        assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
+    }
+
+    @Test
+    public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+        logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setRaftPolicy(createRaftPolicy(true, true));
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -158,9 +292,222 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
-        assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+        assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
+    }
+
+    @Test
+    public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+        logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for (int i = 0; i < 5; i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect only 1 message to be sent because of two reasons,
+        // - an append entries reply was not received
+        // - the heartbeat interval has not expired
+        // In this scenario if multiple messages are sent they would likely be duplicates
+        assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+    }
+
+    @Test
+    public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+        logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for (int i = 0; i < 3; i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
+            leader.handleMessage(followerActor, new AppendEntriesReply(
+                    FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
+
+        }
+
+        for (int i = 3; i < 5; i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+        // get sent to the follower - but not the 5th
+        assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+
+        for (int i = 0; i < 4; i++) {
+            long expected = allMessages.get(i).getEntries().get(0).getIndex();
+            assertEquals(expected, i + 2);
+        }
+    }
+
+    @Test
+    public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+        logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        sendReplicate(actorContext, lastIndex + 1);
+
+        // Wait slightly longer than heartbeat duration
+        Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(1, allMessages.get(0).getEntries().size());
+        assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+        assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
+
+    }
+
+    @Test
+    public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+        logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for (int i = 0; i < 3; i++) {
+            Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+    }
+
+    @Test
+    public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+        logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+        sendReplicate(actorContext, lastIndex + 1);
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(0, allMessages.get(0).getEntries().size());
+        assertEquals(1, allMessages.get(1).getEntries().size());
     }
 
+
     @Test
     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
@@ -173,13 +520,14 @@ public class LeaderTest extends AbstractLeaderTest {
 
         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
         long term = actorContext.getTermInformation().getCurrentTerm();
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
+        ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
+                newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
 
         actorContext.getReplicatedLog().append(newEntry);
 
+        final Identifier id = new MockIdentifier("state-id");
         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(leaderActor, "state-id", newEntry));
+                new Replicate(leaderActor, id, newEntry, true));
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -192,7 +540,7 @@ public class LeaderTest extends AbstractLeaderTest {
                 leaderActor, ApplyState.class);
         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
 
-        for(int i = 0; i <= newLogIndex - 1; i++ ) {
+        for (int i = 0; i <= newLogIndex - 1; i++) {
             ApplyState applyState = applyStateList.get(i);
             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
@@ -200,14 +548,14 @@ public class LeaderTest extends AbstractLeaderTest {
 
         ApplyState last = applyStateList.get((int) newLogIndex - 1);
         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
-        assertEquals("getIdentifier", "state-id", last.getIdentifier());
+        assertEquals("getIdentifier", id, last.getIdentifier());
     }
 
     @Test
     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -217,33 +565,33 @@ public class LeaderTest extends AbstractLeaderTest {
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
-        final int newEntryIndex = 4;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
-        final int currentTerm = 2;
 
         // set the snapshot variables in replicatedlog
         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
         //set follower timeout to 2 mins, helps during debugging
         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
 
         leader = new Leader(actorContext);
 
-        // new entry
-        ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                        new MockRaftActorContext.MockPayload("D"));
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
         fts.getNextChunk();
@@ -252,32 +600,27 @@ public class LeaderTest extends AbstractLeaderTest {
         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
                 TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
-        AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
-        AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+        AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
 
         //InstallSnapshotReply received
         fts.markSendStatus(true);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
-
-        InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
-                InstallSnapshot.SERIALIZABLE_CLASS);
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
-        InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+        InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
-        assertEquals(snapshotIndex, is.getLastIncludedIndex());
+        assertEquals(commitIndex, is.getLastIncludedIndex());
     }
 
     @Test
     public void testSendAppendEntriesSnapshotScenario() throws Exception {
         logStart("testSendAppendEntriesSnapshotScenario");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -304,20 +647,22 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // new entry
-        ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry =
+                new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                         new MockRaftActorContext.MockPayload("D"));
 
+        actorContext.getReplicatedLog().append(entry);
+
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
         RaftActorBehavior raftBehavior = leader.handleMessage(
-                leaderActor, new Replicate(null, "state-id", entry));
+                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertTrue(raftBehavior instanceof Leader);
 
-        MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
     }
 
     @Test
@@ -326,11 +671,6 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        Map<String, String> leadersSnapshot = new HashMap<>();
-        leadersSnapshot.put("1", "A");
-        leadersSnapshot.put("2", "B");
-        leadersSnapshot.put("3", "C");
-
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
@@ -352,10 +692,10 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // set the snapshot as absent and check if capture-snapshot is invoked.
-        leader.setSnapshot(Optional.<ByteString>absent());
+        leader.setSnapshot(null);
 
         // new entry
-        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                 new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -363,65 +703,210 @@ public class LeaderTest extends AbstractLeaderTest {
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
-        CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+        CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
-        List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
-        assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
     @Test
-    public void testInstallSnapshot() throws Exception {
-        logStart("testInstallSnapshot");
+    public void testInitiateForceInstallSnapshot() throws Exception {
+        logStart("testInitiateForceInstallSnapshot");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        Map<String, String> leadersSnapshot = new HashMap<>();
-        leadersSnapshot.put("1", "A");
-        leadersSnapshot.put("2", "B");
-        leadersSnapshot.put("3", "C");
-
-        //clears leaders log
-        actorContext.getReplicatedLog().removeFrom(0);
-
         final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
-        final int snapshotTerm = 1;
+        final int snapshotIndex = -1;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = -1;
         final int currentTerm = 2;
 
         // set the snapshot variables in replicatedlog
         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        actorContext.setLastApplied(3);
         actorContext.setCommitIndex(followersLastIndex);
 
+        actorContext.getReplicatedLog().removeFrom(0);
+
+        AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+        actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
+
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
-        // Ignore initial heartbeat.
+        // Leader will send an immediate heartbeat - ignore it.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new SendInstallSnapshot(toByteString(leadersSnapshot)));
+        // set the snapshot as absent and check if capture-snapshot is invoked.
+        leader.setSnapshot(null);
 
-        assertTrue(raftBehavior instanceof Leader);
+        for (int i = 0; i < 4; i++) {
+            actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
+                    new MockRaftActorContext.MockPayload("X" + i)));
+        }
+
+        // new entry
+        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
+                new MockRaftActorContext.MockPayload("D"));
+
+        actorContext.getReplicatedLog().append(entry);
+
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
+
+        // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
+        // installed with a SendInstallSnapshot
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+        CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
+        assertEquals(3, cs.getLastAppliedIndex());
+        assertEquals(1, cs.getLastAppliedTerm());
+        assertEquals(4, cs.getLastIndex());
+        assertEquals(2, cs.getLastTerm());
+
+        assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
+        assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Sending Replicate message should not initiate another capture since the first is in progress.
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+        // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+        // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
+        final byte[] bytes = new byte[]{1, 2, 3};
+        installSnapshotStream.get().get().write(bytes);
+        actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
+                Runtime.getRuntime().totalMemory());
+        MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+        // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
+        MessageCollectorActor.clearMessages(followerActor);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
+    }
+
+
+    @Test
+    public void testInstallSnapshot() throws Exception {
+        logStart("testInstallSnapshot");
+
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
+
+        final int lastAppliedIndex = 3;
+        final int snapshotIndex = 2;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
+
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        actorContext.setCommitIndex(lastAppliedIndex);
+        actorContext.setLastApplied(lastAppliedIndex);
+
+        leader = new Leader(actorContext);
+
+        // Initial heartbeat.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
+
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
+
+        assertTrue(raftBehavior instanceof Leader);
+
+        // check if installsnapshot gets called with the correct values.
+
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
+
+        assertNotNull(installSnapshot.getData());
+        assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
+        assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
+
+        assertEquals(currentTerm, installSnapshot.getTerm());
+    }
+
+    @Test
+    public void testForceInstallSnapshot() throws Exception {
+        logStart("testForceInstallSnapshot");
+
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        final int lastAppliedIndex = 3;
+        final int snapshotIndex = -1;
+        final int snapshotTerm = -1;
+        final int currentTerm = 2;
+
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        actorContext.setCommitIndex(lastAppliedIndex);
+        actorContext.setLastApplied(lastAppliedIndex);
+
+        leader = new Leader(actorContext);
+
+        // Initial heartbeat.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
+
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
+
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
+
+        assertTrue(raftBehavior instanceof Leader);
 
         // check if installsnapshot gets called with the correct values.
 
-        InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
-                MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertNotNull(installSnapshot.getData());
-        assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+        assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
 
         assertEquals(currentTerm, installSnapshot.getTerm());
@@ -433,14 +918,18 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
+
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
         // Ignore initial heartbeat.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -457,10 +946,14 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
-        while(!fts.isLastChunk(fts.getChunkIndex())) {
+        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
+        while (!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
             fts.incrementChunkIndex();
         }
@@ -473,13 +966,13 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        assertEquals(0, leader.followerSnapshotSize());
         assertEquals(1, leader.followerLogSize());
         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
         assertNotNull(fli);
-        assertEquals(snapshotIndex, fli.getMatchIndex());
-        assertEquals(snapshotIndex, fli.getMatchIndex());
-        assertEquals(snapshotIndex + 1, fli.getNextIndex());
+        assertNull(fli.getInstallSnapshotState());
+        assertEquals(commitIndex, fli.getMatchIndex());
+        assertEquals(commitIndex + 1, fli.getNextIndex());
+        assertFalse(leader.hasSnapshot());
     }
 
     @Test
@@ -488,12 +981,12 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
             @Override
             public int getSnapshotChunkSize() {
                 return 50;
@@ -503,9 +996,13 @@ public class LeaderTest extends AbstractLeaderTest {
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
         actorContext.setConfigParams(configParams);
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
+
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -518,12 +1015,14 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
-        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -532,8 +1031,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-        installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(2, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -542,43 +1040,44 @@ public class LeaderTest extends AbstractLeaderTest {
         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-        installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
         followerActor.underlyingActor().clear();
         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-        installSnapshot = MessageCollectorActor.getFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
 
-        Assert.assertNull(installSnapshot);
+        assertNull(installSnapshot);
     }
 
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
             @Override
             public int getSnapshotChunkSize() {
                 return 50;
             }
         });
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
         leadersSnapshot.put("2", "B");
@@ -590,12 +1089,15 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
-        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -608,10 +1110,9 @@ public class LeaderTest extends AbstractLeaderTest {
         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
                 TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
-        installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -623,8 +1124,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -635,10 +1136,13 @@ public class LeaderTest extends AbstractLeaderTest {
             }
         });
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
         leadersSnapshot.put("2", "B");
@@ -650,46 +1154,37 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
-        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+        assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
+                installSnapshot.getLastChunkHashCode().get().intValue());
 
-        int hashCode = installSnapshot.getData().hashCode();
+        final int hashCode = Arrays.hashCode(installSnapshot.getData());
 
         followerActor.underlyingActor().clear();
 
         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
                 FOLLOWER_ID, 1, true));
 
-        installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(2, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+        assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
     }
 
     @Test
-    public void testFollowerToSnapshotLogic() {
-        logStart("testFollowerToSnapshotLogic");
-
-        MockRaftActorContext actorContext = createActorContext();
-
-        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
-            @Override
-            public int getSnapshotChunkSize() {
-                return 50;
-            }
-        });
-
-        leader = new Leader(actorContext);
+    public void testLeaderInstallSnapshotState() throws IOException {
+        logStart("testLeaderInstallSnapshotState");
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -699,22 +1194,22 @@ public class LeaderTest extends AbstractLeaderTest {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
+        fts.setSnapshotBytes(ByteSource.wrap(barray));
 
         assertEquals(bs.size(), barray.length);
 
-        int chunkIndex=0;
-        for (int i=0; i < barray.length; i = i + 50) {
-            int j = i + 50;
+        int chunkIndex = 0;
+        for (int i = 0; i < barray.length; i = i + 50) {
+            int length = i + 50;
             chunkIndex++;
 
             if (i + 50 > barray.length) {
-                j = barray.length;
+                length = barray.length;
             }
 
-            ByteString chunk = fts.getNextChunk();
-            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+            byte[] chunk = fts.getNextChunk();
+            assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
 
             fts.markSendStatus(true);
@@ -724,10 +1219,11 @@ public class LeaderTest extends AbstractLeaderTest {
         }
 
         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+        fts.close();
     }
 
-    @Override protected RaftActorBehavior createBehavior(
-        RaftActorContext actorContext) {
+    @Override
+    protected Leader createBehavior(final RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
@@ -738,14 +1234,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
     @Override
     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
-        return createActorContext("leader", actorRef);
-    }
-
-    private MockRaftActorContext createActorContextWithFollower() {
-        MockRaftActorContext actorContext = createActorContext();
-        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
-                followerActor.path().toString()).build());
-        return actorContext;
+        return createActorContext(LEADER_ID, actorRef);
     }
 
     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
@@ -754,19 +1243,37 @@ public class LeaderTest extends AbstractLeaderTest {
         configParams.setElectionTimeoutFactor(100000);
         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
         context.setConfigParams(configParams);
+        context.setPayloadVersion(payloadVersion);
         return context;
     }
 
+    private MockRaftActorContext createActorContextWithFollower() {
+        MockRaftActorContext actorContext = createActorContext();
+        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
+                followerActor.path().toString()).build());
+        return actorContext;
+    }
+
+    private MockRaftActorContext createFollowerActorContextWithLeader() {
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+        DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
+        followerConfig.setElectionTimeoutFactor(10000);
+        followerActorContext.setConfigParams(followerConfig);
+        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+        return followerActorContext;
+    }
+
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
 
-        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         Map<String, String> peerAddresses = new HashMap<>();
         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -790,10 +1297,11 @@ public class LeaderTest extends AbstractLeaderTest {
         followerActorContext.setCommitIndex(1);
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(-1, appendEntries.getLeaderCommit());
         assertEquals(0, appendEntries.getEntries().size());
         assertEquals(0, appendEntries.getPrevLogIndex());
 
@@ -814,17 +1322,19 @@ public class LeaderTest extends AbstractLeaderTest {
     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
 
-        MockRaftActorContext leaderActorContext = createActorContext();
+        final MockRaftActorContext leaderActorContext = createActorContext();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
-        Map<String, String> peerAddresses = new HashMap<>();
-        peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+        Map<String, String> leaderPeerAddresses = new HashMap<>();
+        leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
 
-        leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.setPeerAddresses(leaderPeerAddresses);
 
         leaderActorContext.getReplicatedLog().removeFrom(0);
 
@@ -846,7 +1356,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // Initial heartbeat
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(-1, appendEntries.getLeaderCommit());
         assertEquals(0, appendEntries.getEntries().size());
         assertEquals(0, appendEntries.getPrevLogIndex());
 
@@ -865,7 +1375,7 @@ public class LeaderTest extends AbstractLeaderTest {
         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
                 TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
@@ -884,26 +1394,298 @@ public class LeaderTest extends AbstractLeaderTest {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyFailure(){
-        logStart("testHandleAppendEntriesReplyFailure");
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        long leaderCommitIndex = 2;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        followerActorContext.setCommitIndex(0);
+        followerActorContext.setLastApplied(0);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
-        // Send initial heartbeat reply with last index.
-        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent.
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersThirdLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
-        assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
+        assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
 
-        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
+    }
 
-        assertEquals(RaftState.Leader, raftActorBehavior.state());
+    @Test
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+        long leaderCommitIndex = 1;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+        long leaderCommitIndex = 1;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyWithNewerTerm() {
+        logStart("testHandleAppendEntriesReplyWithNewerTerm");
+
+        MockRaftActorContext leaderActorContext = createActorContext();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(10000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+
+        leader = new Leader(leaderActorContext);
+        leaderActor.underlyingActor().setBehavior(leader);
+        leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
+
+        assertEquals(false, appendEntriesReply.isSuccess());
+        assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+        MessageCollectorActor.clearMessages(leaderActor);
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
+        logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
 
-        assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
+        MockRaftActorContext leaderActorContext = createActorContext();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(10000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+        leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
+
+        leader = new Leader(leaderActorContext);
+        leaderActor.underlyingActor().setBehavior(leader);
+        leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
+
+        assertEquals(false, appendEntriesReply.isSuccess());
+        assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+        MessageCollectorActor.clearMessages(leaderActor);
     }
 
     @Test
@@ -921,7 +1703,12 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
+        assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+        assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
+
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -944,17 +1731,22 @@ public class LeaderTest extends AbstractLeaderTest {
         ApplyState applyState = applyStateList.get(0);
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+        assertEquals(2, followerInfo.getMatchIndex());
+        assertEquals(3, followerInfo.getNextIndex());
+        assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+        assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
     }
 
     @Test
-    public void testHandleAppendEntriesReplyUnknownFollower(){
+    public void testHandleAppendEntriesReplyUnknownFollower() {
         logStart("testHandleAppendEntriesReplyUnknownFollower");
 
         MockRaftActorContext leaderActorContext = createActorContext();
 
         leader = new Leader(leaderActorContext);
 
-        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
+        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -962,7 +1754,93 @@ public class LeaderTest extends AbstractLeaderTest {
     }
 
     @Test
-    public void testHandleRequestVoteReply(){
+    public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
+        logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
+        long leaderCommitIndex = 3;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+        final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
+                AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+
+        appendEntries = appendEntriesList.get(0);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        appendEntries = appendEntriesList.get(1);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersThirdLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersFourthLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
+
+        MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
+
+        assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testHandleRequestVoteReply() {
         logStart("testHandleRequestVoteReply");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -987,200 +1865,373 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext leaderActorContext = createActorContext();
 
         leader = new Leader(leaderActorContext);
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-        Assert.assertTrue(behavior instanceof Leader);
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue(newBehavior instanceof Leader);
     }
 
     @Test
-    public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
-        logStart("testIsolatedLeaderCheckTwoFollowers");
+    public void testIsolatedLeaderCheckNoVotingFollowers() {
+        logStart("testIsolatedLeaderCheckNoVotingFollowers");
 
-        new JavaTestKit(getSystem()) {{
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
 
-            ActorRef followerActor1 = getTestActor();
-            ActorRef followerActor2 = getTestActor();
+        leader = new Leader(leaderActorContext);
+        leader.getFollower(FOLLOWER_ID).markFollowerActive();
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Expected Leader", newBehavior instanceof Leader);
+    }
 
-            MockRaftActorContext leaderActorContext = createActorContext();
+    private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
+        ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
+        ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower-1", followerActor1.path().toString());
-            peerAddresses.put("follower-2", followerActor2.path().toString());
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put("follower-1", followerActor1.path().toString());
+        peerAddresses.put("follower-2", followerActor2.path().toString());
 
-            leader = new Leader(leaderActorContext);
+        leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.setRaftPolicy(raftPolicy);
 
-            leader.markFollowerActive("follower-1");
-            leader.markFollowerActive("follower-2");
-            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-            Assert.assertTrue("Behavior not instance of Leader when all followers are active",
-                behavior instanceof Leader);
+        leader = new Leader(leaderActorContext);
 
-            // kill 1 follower and verify if that got killed
-            final JavaTestKit probe = new JavaTestKit(getSystem());
-            probe.watch(followerActor1);
-            followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
-            assertEquals(termMsg1.getActor(), followerActor1);
+        leader.markFollowerActive("follower-1");
+        leader.markFollowerActive("follower-2");
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
+
+        // kill 1 follower and verify if that got killed
+        final JavaTestKit probe = new JavaTestKit(getSystem());
+        probe.watch(followerActor1);
+        followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+        assertEquals(termMsg1.getActor(), followerActor1);
+
+        leader.markFollowerInActive("follower-1");
+        leader.markFollowerActive("follower-2");
+        newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Behavior not instance of Leader when majority of followers are active",
+                newBehavior instanceof Leader);
+
+        // kill 2nd follower and leader should change to Isolated leader
+        followerActor2.tell(PoisonPill.getInstance(), null);
+        probe.watch(followerActor2);
+        followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+        assertEquals(termMsg2.getActor(), followerActor2);
+
+        leader.markFollowerInActive("follower-2");
+        return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+    }
 
-            leader.markFollowerInActive("follower-1");
-            leader.markFollowerActive("follower-2");
-            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-            Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
-                behavior instanceof Leader);
+    @Test
+    public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+        logStart("testIsolatedLeaderCheckTwoFollowers");
 
-            // kill 2nd follower and leader should change to Isolated leader
-            followerActor2.tell(PoisonPill.getInstance(), null);
-            probe.watch(followerActor2);
-            followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
-            assertEquals(termMsg2.getActor(), followerActor2);
+        RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
 
-            leader.markFollowerInActive("follower-2");
-            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-            Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
-                behavior instanceof IsolatedLeader);
-        }};
+        assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+            newBehavior instanceof IsolatedLeader);
     }
 
+    @Test
+    public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
+        logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
+
+        RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
+
+        assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
+                newBehavior instanceof Leader);
+    }
 
     @Test
-    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
-        logStart("testAppendEntryCallAtEndofAppendEntryReply");
+    public void testLaggingFollowerStarvation() throws Exception {
+        logStart("testLaggingFollowerStarvation");
 
-        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        String leaderActorId = actorFactory.generateActorId("leader");
+        String follower1ActorId = actorFactory.generateActorId("follower");
+        String follower2ActorId = actorFactory.generateActorId("follower");
+
+        final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+        final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+        MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
 
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-        //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+        configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
         leaderActorContext.setConfigParams(configParams);
 
-        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
 
-        followerActorContext.setConfigParams(configParams);
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put(follower1ActorId,
+                follower1Actor.path().toString());
+        peerAddresses.put(follower2ActorId,
+                follower2Actor.path().toString());
 
-        Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().setBehavior(follower);
+        leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.getTermInformation().update(1, leaderActorId);
 
-        leaderActorContext.getReplicatedLog().removeFrom(0);
+        leader = createBehavior(leaderActorContext);
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        for (int i = 1; i < 6; i++) {
+            // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+            RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
+                    new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
+            assertTrue(newBehavior == leader);
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
+
+        // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+        List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+        assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+                heartbeats.size() > 1);
+
+        // Check if follower-2 got AppendEntries during this time and was not starved
+        List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+
+        assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+                appendEntries.size() > 1);
+    }
+
+    @Test
+    public void testReplicationConsensusWithNonVotingFollower() {
+        logStart("testReplicationConsensusWithNonVotingFollower");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         leaderActorContext.setCommitIndex(-1);
         leaderActorContext.setLastApplied(-1);
 
-        followerActorContext.getReplicatedLog().removeFrom(0);
-        followerActorContext.setCommitIndex(-1);
-        followerActorContext.setLastApplied(-1);
+        String nonVotingFollowerId = "nonvoting-follower";
+        TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
+                Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+
+        leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
+                VotingState.NON_VOTING);
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
-                leaderActor, AppendEntriesReply.class);
+        // Ignore initial heartbeats
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
 
-        leader.handleMessage(followerActor, appendEntriesReply);
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
 
-        // Clear initial heartbeat messages
+        // Send a Replicate message and wait for AppendEntries.
+        sendReplicate(leaderActorContext, 0);
 
-        leaderActor.underlyingActor().clear();
-        followerActor.underlyingActor().clear();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
 
-        // create 3 entries
-        leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-        leaderActorContext.setCommitIndex(1);
-        leaderActorContext.setLastApplied(1);
+        // Send reply only from the voting follower and verify consensus via ApplyState.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
 
-        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
 
-        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
 
-        // Should send first log entry
-        assertEquals(1, appendEntries.getLeaderCommit());
-        assertEquals(0, appendEntries.getEntries().get(0).getIndex());
-        assertEquals(-1, appendEntries.getPrevLogIndex());
+        // Send another Replicate message
+        sendReplicate(leaderActorContext, 1);
 
-        appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
+                AppendEntries.class);
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
 
-        assertEquals(1, appendEntriesReply.getLogLastTerm());
-        assertEquals(0, appendEntriesReply.getLogLastIndex());
+        // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
 
-        followerActor.underlyingActor().clear();
+        MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
 
-        leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+        // Send reply from the voting follower and verify consensus.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
 
-        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+    }
 
-        // Should send second log entry
-        assertEquals(1, appendEntries.getLeaderCommit());
-        assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+    @Test
+    public void testTransferLeadershipWithFollowerInSync() {
+        logStart("testTransferLeadershipWithFollowerInSync");
 
-        follower.close();
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        leaderActorContext.setLastApplied(-1);
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, 0);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+        // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+        MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+
+        verify(mockTransferCohort).transferComplete();
     }
 
     @Test
-    public void testLaggingFollowerStarvation() throws Exception {
-        logStart("testLaggingFollowerStarvation");
-        new JavaTestKit(getSystem()) {{
-            String leaderActorId = actorFactory.generateActorId("leader");
-            String follower1ActorId = actorFactory.generateActorId("follower");
-            String follower2ActorId = actorFactory.generateActorId("follower");
+    public void testTransferLeadershipWithEmptyLog() {
+        logStart("testTransferLeadershipWithEmptyLog");
 
-            TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
-                    actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
-            ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
-            ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
-            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
-            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
 
-            leaderActorContext.setConfigParams(configParams);
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+        leader.transferLeadership(mockTransferCohort);
 
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(follower1ActorId,
-                    follower1Actor.path().toString());
-            peerAddresses.put(follower2ActorId,
-                    follower2Actor.path().toString());
+        // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
-            leaderActorContext.getTermInformation().update(1, leaderActorId);
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
 
-            RaftActorBehavior leader = createBehavior(leaderActorContext);
+        verify(mockTransferCohort).transferComplete();
+    }
 
-            leaderActor.underlyingActor().setBehavior(leader);
+    @Test
+    public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+        logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
 
-            for(int i=1;i<6;i++) {
-                // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
-                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
-                assertTrue(newBehavior == leader);
-                Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
-            }
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(200, TimeUnit.MILLISECONDS));
 
-            // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
-            List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
-            assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
-                    heartbeats.size() > 1);
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
 
-            // Check if follower-2 got AppendEntries during this time and was not starved
-            List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+        // Sync up the follower.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
 
-            assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
-                    appendEntries.size() > 1);
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithFollowerSyncTimeout() {
+        logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(200, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, 0);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+
+        // Send heartbeats to time out the transfer.
+        for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                    .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+        }
 
-        }};
+        verify(mockTransferCohort).abortTransfer();
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
     }
 
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
@@ -1191,7 +2242,7 @@ public class LeaderTest extends AbstractLeaderTest {
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
 
-        public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+        MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
             super();
             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
             this.snapshotChunkSize = snapshotChunkSize;