Bug 8206: Fix IOException from initiateCaptureSnapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index f13938cdb522a31fc1f4f24a4e8fb6f7d60f73ea..c3d33e12ec41ae744079bf864a4ab7d769f632f8 100644 (file)
@@ -14,24 +14,31 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
@@ -42,8 +49,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -52,6 +57,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -59,6 +65,9 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
@@ -83,7 +92,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     @Override
     @After
     public void tearDown() throws Exception {
-        if(leader != null) {
+        if (leader != null) {
             leader.close();
         }
 
@@ -106,7 +115,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
         actorContext.setCommitIndex(-1);
-        short payloadVersion = (short)5;
         actorContext.setPayloadVersion(payloadVersion);
 
         long term = 1;
@@ -116,7 +124,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.setCurrentBehavior(leader);
 
         // Leader should send an immediate heartbeat with no entries as follower is inactive.
-        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        final long lastIndex = actorContext.getReplicatedLog().lastIndex();
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         assertEquals("getTerm", term, appendEntries.getTerm());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
@@ -132,8 +140,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         followerActor.underlyingActor().clear();
 
         // Sleep for the heartbeat interval so AppendEntries is sent.
-        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
-                getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
 
         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
@@ -147,16 +155,15 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
         return sendReplicate(actorContext, 1, index);
     }
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                term, index, payload);
+        SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
-        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
     }
 
     @Test
@@ -286,7 +293,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
-        assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
+        assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
     }
 
     @Test
@@ -317,8 +324,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         followerActor.underlyingActor().clear();
 
-        for(int i=0;i<5;i++) {
-            sendReplicate(actorContext, lastIndex+i+1);
+        for (int i = 0; i < 5; i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
         }
 
         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
@@ -357,14 +364,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         followerActor.underlyingActor().clear();
 
-        for(int i=0;i<3;i++) {
-            sendReplicate(actorContext, lastIndex+i+1);
+        for (int i = 0; i < 3; i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
             leader.handleMessage(followerActor, new AppendEntriesReply(
                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
 
         }
 
-        for(int i=3;i<5;i++) {
+        for (int i = 3; i < 5; i++) {
             sendReplicate(actorContext, lastIndex + i + 1);
         }
 
@@ -373,9 +380,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // get sent to the follower - but not the 5th
         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
 
-        for(int i=0;i<4;i++) {
+        for (int i = 0; i < 4; i++) {
             long expected = allMessages.get(i).getEntries().get(0).getIndex();
-            assertEquals(expected, i+2);
+            assertEquals(expected, i + 2);
         }
     }
 
@@ -407,7 +414,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         followerActor.underlyingActor().clear();
 
-        sendReplicate(actorContext, lastIndex+1);
+        sendReplicate(actorContext, lastIndex + 1);
 
         // Wait slightly longer than heartbeat duration
         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
@@ -418,9 +425,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
 
         assertEquals(1, allMessages.get(0).getEntries().size());
-        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
         assertEquals(1, allMessages.get(1).getEntries().size());
-        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
 
     }
 
@@ -452,7 +459,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         followerActor.underlyingActor().clear();
 
-        for(int i=0;i<3;i++) {
+        for (int i = 0; i < 3; i++) {
             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         }
@@ -491,7 +498,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
-        sendReplicate(actorContext, lastIndex+1);
+        sendReplicate(actorContext, lastIndex + 1);
 
         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
@@ -513,13 +520,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
         long term = actorContext.getTermInformation().getCurrentTerm();
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
+        ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
+                newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
 
         actorContext.getReplicatedLog().append(newEntry);
 
         final Identifier id = new MockIdentifier("state-id");
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new Replicate(leaderActor, id, newEntry, true));
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -532,7 +540,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 leaderActor, ApplyState.class);
         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
 
-        for(int i = 0; i <= newLogIndex - 1; i++ ) {
+        for (int i = 0; i <= newLogIndex - 1; i++) {
             ApplyState applyState = applyStateList.get(i);
             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
@@ -547,7 +555,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -559,9 +567,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         final int commitIndex = 3;
         final int snapshotIndex = 2;
-        final int newEntryIndex = 4;
         final int snapshotTerm = 1;
-        final int currentTerm = 2;
 
         // set the snapshot variables in replicatedlog
         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
@@ -575,20 +581,16 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
-        // new entry
-        ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                        new MockRaftActorContext.MockPayload("D"));
-
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
@@ -618,7 +620,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testSendAppendEntriesSnapshotScenario() throws Exception {
         logStart("testSendAppendEntriesSnapshotScenario");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -645,8 +647,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // new entry
-        ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry =
+                new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                         new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -656,7 +658,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
         RaftActorBehavior raftBehavior = leader.handleMessage(
-                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -693,7 +695,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.setSnapshot(null);
 
         // new entry
-        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                 new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -701,20 +703,19 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
@@ -739,6 +740,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         actorContext.getReplicatedLog().removeFrom(0);
 
+        AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+        actorContext.setCreateSnapshotProcedure(out -> installSnapshotStream.set(out));
+
         leader = new Leader(actorContext);
         actorContext.setCurrentBehavior(leader);
 
@@ -748,13 +752,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // set the snapshot as absent and check if capture-snapshot is invoked.
         leader.setSnapshot(null);
 
-        for(int i=0;i<4;i++) {
-            actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+        for (int i = 0; i < 4; i++) {
+            actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
                     new MockRaftActorContext.MockPayload("X" + i)));
         }
 
         // new entry
-        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                 new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -769,17 +773,35 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
-
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
-        // if an initiate is started again when first is in progress, it should not initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
+        assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
 
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Sending Replicate message should not initiate another capture since the first is in progress.
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+        // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+        // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
+        final byte[] bytes = new byte[]{1, 2, 3};
+        installSnapshotStream.get().get().write(bytes);
+        actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
+                Runtime.getRuntime().totalMemory());
+        MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+        // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
+        MessageCollectorActor.clearMessages(followerActor);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
     }
 
 
@@ -787,7 +809,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testInstallSnapshot() throws Exception {
         logStart("testInstallSnapshot");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -817,17 +839,19 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
-        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(),
-                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
 
         assertTrue(raftBehavior instanceof Leader);
 
         // check if installsnapshot gets called with the correct values.
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertNotNull(installSnapshot.getData());
         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
@@ -840,7 +864,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testForceInstallSnapshot() throws Exception {
         logStart("testForceInstallSnapshot");
 
-        MockRaftActorContext actorContext = createActorContextWithFollower();
+        final MockRaftActorContext actorContext = createActorContextWithFollower();
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -867,17 +891,19 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
 
-        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(),
-                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
 
         assertTrue(raftBehavior instanceof Leader);
 
         // check if installsnapshot gets called with the correct values.
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertNotNull(installSnapshot.getData());
         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
@@ -920,13 +946,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
-        while(!fts.isLastChunk(fts.getChunkIndex())) {
+        while (!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
             fts.incrementChunkIndex();
         }
@@ -959,7 +986,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
             @Override
             public int getSnapshotChunkSize() {
                 return 50;
@@ -988,13 +1015,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -1026,7 +1054,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -1036,7 +1064,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
             @Override
             public int getSnapshotChunkSize() {
                 return 50;
@@ -1061,14 +1089,15 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -1125,20 +1154,21 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
-        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
                 installSnapshot.getLastChunkHashCode().get().intValue());
 
-        int hashCode = Arrays.hashCode(installSnapshot.getData());
+        final int hashCode = Arrays.hashCode(installSnapshot.getData());
 
         followerActor.underlyingActor().clear();
 
@@ -1153,7 +1183,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testLeaderInstallSnapshotState() {
+    public void testLeaderInstallSnapshotState() throws IOException {
         logStart("testLeaderInstallSnapshotState");
 
         Map<String, String> leadersSnapshot = new HashMap<>();
@@ -1165,21 +1195,21 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         byte[] barray = bs.toByteArray();
 
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(barray));
 
         assertEquals(bs.size(), barray.length);
 
-        int chunkIndex=0;
-        for (int i=0; i < barray.length; i = i + 50) {
-            int j = i + 50;
+        int chunkIndex = 0;
+        for (int i = 0; i < barray.length; i = i + 50) {
+            int length = i + 50;
             chunkIndex++;
 
             if (i + 50 > barray.length) {
-                j = barray.length;
+                length = barray.length;
             }
 
             byte[] chunk = fts.getNextChunk();
-            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
+            assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
 
             fts.markSendStatus(true);
@@ -1189,6 +1219,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         }
 
         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+        fts.close();
     }
 
     @Override
@@ -1206,13 +1237,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         return createActorContext(LEADER_ID, actorRef);
     }
 
-    private MockRaftActorContext createActorContextWithFollower() {
-        MockRaftActorContext actorContext = createActorContext();
-        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
-                followerActor.path().toString()).build());
-        return actorContext;
-    }
-
     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
@@ -1223,6 +1247,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         return context;
     }
 
+    private MockRaftActorContext createActorContextWithFollower() {
+        MockRaftActorContext actorContext = createActorContext();
+        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
+                followerActor.path().toString()).build());
+        return actorContext;
+    }
+
     private MockRaftActorContext createFollowerActorContextWithLeader() {
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
@@ -1236,7 +1267,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
 
-        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
@@ -1291,7 +1322,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
 
-        MockRaftActorContext leaderActorContext = createActorContext();
+        final MockRaftActorContext leaderActorContext = createActorContext();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
@@ -1363,7 +1394,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
@@ -1376,8 +1407,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
-        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
-        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
 
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
@@ -1392,7 +1423,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader = new Leader(leaderActorContext);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
@@ -1455,8 +1487,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
-        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
-        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
 
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
@@ -1471,7 +1503,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader = new Leader(leaderActorContext);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
@@ -1522,7 +1555,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
@@ -1535,8 +1568,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
-        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
-        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
 
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
@@ -1552,7 +1585,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader = new Leader(leaderActorContext);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
@@ -1606,7 +1640,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyWithNewerTerm(){
+    public void testHandleAppendEntriesReplyWithNewerTerm() {
         logStart("testHandleAppendEntriesReplyWithNewerTerm");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -1620,7 +1654,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActor.underlyingActor().setBehavior(leader);
         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
 
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         assertEquals(false, appendEntriesReply.isSuccess());
         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
@@ -1629,7 +1664,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
+    public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -1644,7 +1679,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActor.underlyingActor().setBehavior(leader);
         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
 
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         assertEquals(false, appendEntriesReply.isSuccess());
         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
@@ -1672,7 +1708,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
 
-        short payloadVersion = 5;
         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
@@ -1704,7 +1739,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyUnknownFollower(){
+    public void testHandleAppendEntriesReplyUnknownFollower() {
         logStart("testHandleAppendEntriesReplyUnknownFollower");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -1733,10 +1768,10 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
-        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
-        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
-        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
-        ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+        final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+        final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
 
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
@@ -1751,7 +1786,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader = new Leader(leaderActorContext);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
@@ -1766,7 +1802,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
-        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
+                AppendEntries.class, 2);
         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
 
         appendEntries = appendEntriesList.get(0);
@@ -1803,7 +1840,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleRequestVoteReply(){
+    public void testHandleRequestVoteReply() {
         logStart("testHandleRequestVoteReply");
 
         MockRaftActorContext leaderActorContext = createActorContext();
@@ -1828,8 +1865,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MockRaftActorContext leaderActorContext = createActorContext();
 
         leader = new Leader(leaderActorContext);
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        assertTrue(behavior instanceof Leader);
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue(newBehavior instanceof Leader);
     }
 
     @Test
@@ -1847,11 +1884,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader = new Leader(leaderActorContext);
         leader.getFollower(FOLLOWER_ID).markFollowerActive();
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        assertTrue("Expected Leader", behavior instanceof Leader);
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Expected Leader", newBehavior instanceof Leader);
     }
 
-    private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
+    private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
 
@@ -1868,8 +1905,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.markFollowerActive("follower-1");
         leader.markFollowerActive("follower-2");
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
+        RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
 
         // kill 1 follower and verify if that got killed
         final JavaTestKit probe = new JavaTestKit(getSystem());
@@ -1880,8 +1917,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.markFollowerInActive("follower-1");
         leader.markFollowerActive("follower-2");
-        behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
+        newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Behavior not instance of Leader when majority of followers are active",
+                newBehavior instanceof Leader);
 
         // kill 2nd follower and leader should change to Isolated leader
         followerActor2.tell(PoisonPill.getInstance(), null);
@@ -1898,80 +1936,77 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
         logStart("testIsolatedLeaderCheckTwoFollowers");
 
-        RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
+        RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
 
         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
-            behavior instanceof IsolatedLeader);
+            newBehavior instanceof IsolatedLeader);
     }
 
     @Test
     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
 
-        RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
+        RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
 
         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
-                behavior instanceof Leader);
+                newBehavior instanceof Leader);
     }
 
     @Test
     public void testLaggingFollowerStarvation() throws Exception {
         logStart("testLaggingFollowerStarvation");
-        new JavaTestKit(getSystem()) {{
-            String leaderActorId = actorFactory.generateActorId("leader");
-            String follower1ActorId = actorFactory.generateActorId("follower");
-            String follower2ActorId = actorFactory.generateActorId("follower");
 
-            TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
-                    actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
-            ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
-            ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+        String leaderActorId = actorFactory.generateActorId("leader");
+        String follower1ActorId = actorFactory.generateActorId("follower");
+        String follower2ActorId = actorFactory.generateActorId("follower");
 
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+        final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+        final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
 
-            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
-            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+        MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
 
-            leaderActorContext.setConfigParams(configParams);
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+        leaderActorContext.setConfigParams(configParams);
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(follower1ActorId,
-                    follower1Actor.path().toString());
-            peerAddresses.put(follower2ActorId,
-                    follower2Actor.path().toString());
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
-            leaderActorContext.getTermInformation().update(1, leaderActorId);
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put(follower1ActorId,
+                follower1Actor.path().toString());
+        peerAddresses.put(follower2ActorId,
+                follower2Actor.path().toString());
 
-            RaftActorBehavior leader = createBehavior(leaderActorContext);
+        leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.getTermInformation().update(1, leaderActorId);
 
-            leaderActor.underlyingActor().setBehavior(leader);
+        leader = createBehavior(leaderActorContext);
 
-            for(int i=1;i<6;i++) {
-                // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
-                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
-                assertTrue(newBehavior == leader);
-                Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
-            }
+        leaderActor.underlyingActor().setBehavior(leader);
 
-            // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
-            List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+        for (int i = 1; i < 6; i++) {
+            // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+            RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
+                    new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
+            assertTrue(newBehavior == leader);
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
 
-            assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
-                    heartbeats.size() > 1);
+        // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+        List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
 
-            // Check if follower-2 got AppendEntries during this time and was not starved
-            List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+        assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+                heartbeats.size() > 1);
 
-            assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
-                    appendEntries.size() > 1);
+        // Check if follower-2 got AppendEntries during this time and was not starved
+        List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
 
-        }};
+        assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+                appendEntries.size() > 1);
     }
 
     @Test
@@ -1990,7 +2025,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
 
-        leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+        leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
+                VotingState.NON_VOTING);
 
         leader = new Leader(leaderActorContext);
         leaderActorContext.setCurrentBehavior(leader);
@@ -2069,6 +2105,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
 
@@ -2099,6 +2136,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
@@ -2130,6 +2168,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
@@ -2139,8 +2178,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
         MessageCollectorActor.clearMessages(followerActor);
 
-        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
-                getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
@@ -2180,9 +2219,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         verify(mockTransferCohort, never()).transferComplete();
 
         // Send heartbeats to time out the transfer.
-        for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
-            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
-                    getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+        for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                    .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         }
 
@@ -2203,7 +2242,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
 
-        public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+        MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
             super();
             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
             this.snapshotChunkSize = snapshotChunkSize;