Remove MockReplicatedLogEntry
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index eb81e512f3ce3590d31f0139a6a12aa48c7ccc8f..a842069c544ac59169cf362022f1b090e4b58b1a 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -16,6 +17,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -41,7 +43,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -51,7 +52,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -59,6 +59,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
@@ -83,7 +84,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     @Override
     @After
     public void tearDown() throws Exception {
-        if(leader != null) {
+        if (leader != null) {
             leader.close();
         }
 
@@ -106,7 +107,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
         actorContext.setCommitIndex(-1);
-        short payloadVersion = (short)5;
         actorContext.setPayloadVersion(payloadVersion);
 
         long term = 1;
@@ -116,7 +116,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.setCurrentBehavior(leader);
 
         // Leader should send an immediate heartbeat with no entries as follower is inactive.
-        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        final long lastIndex = actorContext.getReplicatedLog().lastIndex();
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         assertEquals("getTerm", term, appendEntries.getTerm());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
@@ -132,8 +132,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         followerActor.underlyingActor().clear();
 
         // Sleep for the heartbeat interval so AppendEntries is sent.
-        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
-                getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
 
         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
@@ -147,14 +147,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
         return sendReplicate(actorContext, 1, index);
     }
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                term, index, payload);
+        SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
     }
@@ -202,6 +201,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
         actorContext.setCommitIndex(-1);
+        actorContext.setLastApplied(-1);
 
         // The raft context is initialized with a couple log entries. However the commitIndex
         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
@@ -285,7 +285,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
-        assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
+        assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
     }
 
     @Test
@@ -316,8 +316,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         followerActor.underlyingActor().clear();
 
-        for(int i=0;i<5;i++) {
-            sendReplicate(actorContext, lastIndex+i+1);
+        for (int i = 0; i < 5; i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
         }
 
         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
@@ -356,14 +356,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         followerActor.underlyingActor().clear();
 
-        for(int i=0;i<3;i++) {
-            sendReplicate(actorContext, lastIndex+i+1);
+        for (int i = 0; i < 3; i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
             leader.handleMessage(followerActor, new AppendEntriesReply(
                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
 
         }
 
-        for(int i=3;i<5;i++) {
+        for (int i = 3; i < 5; i++) {
             sendReplicate(actorContext, lastIndex + i + 1);
         }
 
@@ -372,9 +372,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // get sent to the follower - but not the 5th
         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
 
-        for(int i=0;i<4;i++) {
+        for (int i = 0; i < 4; i++) {
             long expected = allMessages.get(i).getEntries().get(0).getIndex();
-            assertEquals(expected, i+2);
+            assertEquals(expected, i + 2);
         }
     }
 
@@ -406,7 +406,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         followerActor.underlyingActor().clear();
 
-        sendReplicate(actorContext, lastIndex+1);
+        sendReplicate(actorContext, lastIndex + 1);
 
         // Wait slightly longer than heartbeat duration
         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
@@ -417,9 +417,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
 
         assertEquals(1, allMessages.get(0).getEntries().size());
-        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
         assertEquals(1, allMessages.get(1).getEntries().size());
-        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
 
     }
 
@@ -451,7 +451,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         followerActor.underlyingActor().clear();
 
-        for(int i=0;i<3;i++) {
+        for (int i = 0; i < 3; i++) {
             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         }
@@ -490,7 +490,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
-        sendReplicate(actorContext, lastIndex+1);
+        sendReplicate(actorContext, lastIndex + 1);
 
         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
@@ -512,8 +512,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
         long term = actorContext.getTermInformation().getCurrentTerm();
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
+        ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
+                newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
 
         actorContext.getReplicatedLog().append(newEntry);
 
@@ -531,7 +531,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 leaderActor, ApplyState.class);
         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
 
-        for(int i = 0; i <= newLogIndex - 1; i++ ) {
+        for (int i = 0; i <= newLogIndex - 1; i++ ) {
             ApplyState applyState = applyStateList.get(i);
             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
@@ -546,7 +546,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -558,9 +558,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         final int commitIndex = 3;
         final int snapshotIndex = 2;
-        final int newEntryIndex = 4;
         final int snapshotTerm = 1;
-        final int currentTerm = 2;
 
         // set the snapshot variables in replicatedlog
         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
@@ -574,19 +572,16 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
-        // new entry
-        ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                        new MockRaftActorContext.MockPayload("D"));
-
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        fts.setSnapshotBytes(bs);
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
         fts.getNextChunk();
@@ -615,7 +610,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testSendAppendEntriesSnapshotScenario() throws Exception {
         logStart("testSendAppendEntriesSnapshotScenario");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -642,8 +637,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // new entry
-        ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry =
+                new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                         new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -690,7 +685,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.setSnapshot(null);
 
         // new entry
-        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                 new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -737,6 +732,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getReplicatedLog().removeFrom(0);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         // Leader will send an immediate heartbeat - ignore it.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -744,13 +740,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // set the snapshot as absent and check if capture-snapshot is invoked.
         leader.setSnapshot(null);
 
-        for(int i=0;i<4;i++) {
-            actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+        for (int i = 0; i < 4; i++) {
+            actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
                     new MockRaftActorContext.MockPayload("X" + i)));
         }
 
         // new entry
-        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                 new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -760,7 +756,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
         // installed with a SendInstallSnapshot
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -783,7 +779,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testInstallSnapshot() throws Exception {
         logStart("testInstallSnapshot");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -823,7 +819,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // check if installsnapshot gets called with the correct values.
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertNotNull(installSnapshot.getData());
         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
@@ -836,7 +833,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testForceInstallSnapshot() throws Exception {
         logStart("testForceInstallSnapshot");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -873,7 +870,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // check if installsnapshot gets called with the correct values.
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertNotNull(installSnapshot.getData());
         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
@@ -918,9 +916,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
-        while(!fts.isLastChunk(fts.getChunkIndex())) {
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        fts.setSnapshotBytes(bs);
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
+        while (!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
             fts.incrementChunkIndex();
         }
@@ -933,12 +933,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        assertEquals(0, leader.followerSnapshotSize());
         assertEquals(1, leader.followerLogSize());
         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
         assertNotNull(fli);
+        assertNull(fli.getInstallSnapshotState());
         assertEquals(commitIndex, fli.getMatchIndex());
         assertEquals(commitIndex + 1, fli.getNextIndex());
+        assertFalse(leader.hasSnapshot());
     }
 
     @Test
@@ -952,7 +953,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
             @Override
             public int getSnapshotChunkSize() {
                 return 50;
@@ -987,7 +988,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -1019,7 +1021,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -1029,7 +1031,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
             @Override
             public int getSnapshotChunkSize() {
                 return 50;
@@ -1061,7 +1063,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -1124,13 +1127,15 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
+        assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
+                installSnapshot.getLastChunkHashCode().get().intValue());
 
-        int hashCode = Arrays.hashCode(installSnapshot.getData());
+        final int hashCode = Arrays.hashCode(installSnapshot.getData());
 
         followerActor.underlyingActor().clear();
 
@@ -1145,19 +1150,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testFollowerToSnapshotLogic() {
-        logStart("testFollowerToSnapshotLogic");
-
-        MockRaftActorContext actorContext = createActorContext();
-
-        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
-            @Override
-            public int getSnapshotChunkSize() {
-                return 50;
-            }
-        });
-
-        leader = new Leader(actorContext);
+    public void testLeaderInstallSnapshotState() {
+        logStart("testLeaderInstallSnapshotState");
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -1167,22 +1161,22 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
+        fts.setSnapshotBytes(bs);
 
         assertEquals(bs.size(), barray.length);
 
-        int chunkIndex=0;
-        for (int i=0; i < barray.length; i = i + 50) {
-            int j = i + 50;
+        int chunkIndex = 0;
+        for (int i = 0; i < barray.length; i = i + 50) {
+            int length = i + 50;
             chunkIndex++;
 
             if (i + 50 > barray.length) {
-                j = barray.length;
+                length = barray.length;
             }
 
             byte[] chunk = fts.getNextChunk();
-            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
+            assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
 
             fts.markSendStatus(true);
@@ -1209,13 +1203,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         return createActorContext(LEADER_ID, actorRef);
     }
 
-    private MockRaftActorContext createActorContextWithFollower() {
-        MockRaftActorContext actorContext = createActorContext();
-        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
-                followerActor.path().toString()).build());
-        return actorContext;
-    }
-
     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
@@ -1226,6 +1213,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         return context;
     }
 
+    private MockRaftActorContext createActorContextWithFollower() {
+        MockRaftActorContext actorContext = createActorContext();
+        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
+                followerActor.path().toString()).build());
+        return actorContext;
+    }
+
     private MockRaftActorContext createFollowerActorContextWithLeader() {
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
@@ -1239,7 +1233,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
 
-        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
@@ -1273,7 +1267,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(-1, appendEntries.getLeaderCommit());
         assertEquals(0, appendEntries.getEntries().size());
         assertEquals(0, appendEntries.getPrevLogIndex());
 
@@ -1294,7 +1288,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
 
-        MockRaftActorContext leaderActorContext = createActorContext();
+        final MockRaftActorContext leaderActorContext = createActorContext();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
@@ -1328,7 +1322,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // Initial heartbeat
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(-1, appendEntries.getLeaderCommit());
         assertEquals(0, appendEntries.getEntries().size());
         assertEquals(0, appendEntries.getPrevLogIndex());
 
@@ -1366,7 +1360,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
@@ -1379,8 +1373,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
-        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
-        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
 
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
@@ -1395,13 +1389,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader = new Leader(leaderActorContext);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
 
-        // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        // Verify initial AppendEntries sent.
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
 
@@ -1458,8 +1453,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
-        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
-        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
 
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
@@ -1474,13 +1469,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader = new Leader(leaderActorContext);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
@@ -1525,7 +1521,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
@@ -1538,8 +1534,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
-        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
-        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
 
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
@@ -1555,13 +1551,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader = new Leader(leaderActorContext);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
@@ -1609,7 +1606,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyWithNewerTerm(){
+    public void testHandleAppendEntriesReplyWithNewerTerm() {
         logStart("testHandleAppendEntriesReplyWithNewerTerm");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -1623,7 +1620,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActor.underlyingActor().setBehavior(leader);
         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
 
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         assertEquals(false, appendEntriesReply.isSuccess());
         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
@@ -1632,7 +1630,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
+    public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -1647,7 +1645,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActor.underlyingActor().setBehavior(leader);
         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
 
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         assertEquals(false, appendEntriesReply.isSuccess());
         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
@@ -1675,7 +1674,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
 
-        short payloadVersion = 5;
         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
@@ -1707,7 +1705,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyUnknownFollower(){
+    public void testHandleAppendEntriesReplyUnknownFollower() {
         logStart("testHandleAppendEntriesReplyUnknownFollower");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -1736,10 +1734,10 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
-        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
-        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
-        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
-        ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+        final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
 
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
@@ -1754,13 +1752,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader = new Leader(leaderActorContext);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
 
@@ -1769,7 +1768,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
-        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
+                AppendEntries.class, 2);
         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
 
         appendEntries = appendEntriesList.get(0);
@@ -1806,7 +1806,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleRequestVoteReply(){
+    public void testHandleRequestVoteReply() {
         logStart("testHandleRequestVoteReply");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -1831,8 +1831,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MockRaftActorContext leaderActorContext = createActorContext();
 
         leader = new Leader(leaderActorContext);
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        assertTrue(behavior instanceof Leader);
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue(newBehavior instanceof Leader);
     }
 
     @Test
@@ -1850,11 +1850,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader = new Leader(leaderActorContext);
         leader.getFollower(FOLLOWER_ID).markFollowerActive();
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        assertTrue("Expected Leader", behavior instanceof Leader);
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Expected Leader", newBehavior instanceof Leader);
     }
 
-    private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
+    private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
 
@@ -1871,8 +1871,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.markFollowerActive("follower-1");
         leader.markFollowerActive("follower-2");
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
 
         // kill 1 follower and verify if that got killed
         final JavaTestKit probe = new JavaTestKit(getSystem());
@@ -1883,8 +1883,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.markFollowerInActive("follower-1");
         leader.markFollowerActive("follower-2");
-        behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
+        newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Behavior not instance of Leader when majority of followers are active",
+                newBehavior instanceof Leader);
 
         // kill 2nd follower and leader should change to Isolated leader
         followerActor2.tell(PoisonPill.getInstance(), null);
@@ -1901,80 +1902,77 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
         logStart("testIsolatedLeaderCheckTwoFollowers");
 
-        RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
+        RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
 
         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
-            behavior instanceof IsolatedLeader);
+            newBehavior instanceof IsolatedLeader);
     }
 
     @Test
     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
 
-        RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
+        RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
 
         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
-                behavior instanceof Leader);
+                newBehavior instanceof Leader);
     }
 
     @Test
     public void testLaggingFollowerStarvation() throws Exception {
         logStart("testLaggingFollowerStarvation");
-        new JavaTestKit(getSystem()) {{
-            String leaderActorId = actorFactory.generateActorId("leader");
-            String follower1ActorId = actorFactory.generateActorId("follower");
-            String follower2ActorId = actorFactory.generateActorId("follower");
 
-            TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
-                    actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
-            ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
-            ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+        String leaderActorId = actorFactory.generateActorId("leader");
+        String follower1ActorId = actorFactory.generateActorId("follower");
+        String follower2ActorId = actorFactory.generateActorId("follower");
 
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+        final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+        final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
 
-            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
-            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+        MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
 
-            leaderActorContext.setConfigParams(configParams);
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+        leaderActorContext.setConfigParams(configParams);
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(follower1ActorId,
-                    follower1Actor.path().toString());
-            peerAddresses.put(follower2ActorId,
-                    follower2Actor.path().toString());
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
-            leaderActorContext.getTermInformation().update(1, leaderActorId);
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put(follower1ActorId,
+                follower1Actor.path().toString());
+        peerAddresses.put(follower2ActorId,
+                follower2Actor.path().toString());
 
-            RaftActorBehavior leader = createBehavior(leaderActorContext);
+        leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.getTermInformation().update(1, leaderActorId);
 
-            leaderActor.underlyingActor().setBehavior(leader);
+        leader = createBehavior(leaderActorContext);
 
-            for(int i=1;i<6;i++) {
-                // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
-                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
-                assertTrue(newBehavior == leader);
-                Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
-            }
+        leaderActor.underlyingActor().setBehavior(leader);
 
-            // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
-            List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+        for (int i = 1; i < 6; i++) {
+            // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+            RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
+                    new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
+            assertTrue(newBehavior == leader);
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
 
-            assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
-                    heartbeats.size() > 1);
+        // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+        List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
 
-            // Check if follower-2 got AppendEntries during this time and was not starved
-            List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+        assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+                heartbeats.size() > 1);
 
-            assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
-                    appendEntries.size() > 1);
+        // Check if follower-2 got AppendEntries during this time and was not starved
+        List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
 
-        }};
+        assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+                appendEntries.size() > 1);
     }
 
     @Test
@@ -1987,12 +1985,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
 
         String nonVotingFollowerId = "nonvoting-follower";
         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
 
-        leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+        leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
+                VotingState.NON_VOTING);
 
         leader = new Leader(leaderActorContext);
         leaderActorContext.setCurrentBehavior(leader);
@@ -2047,6 +2047,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testTransferLeadershipWithFollowerInSync");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        leaderActorContext.setLastApplied(-1);
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
@@ -2140,8 +2141,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
         MessageCollectorActor.clearMessages(followerActor);
 
-        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
-                getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
@@ -2181,9 +2182,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         verify(mockTransferCohort, never()).transferComplete();
 
         // Send heartbeats to time out the transfer.
-        for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
-            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
-                    getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+        for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                    .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         }
 
@@ -2204,7 +2205,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
 
-        public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+        MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
             super();
             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
             this.snapshotChunkSize = snapshotChunkSize;