Migrate from JavaTestKit to javadsl.TestKit
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 4e4fa2c33e2d5fff349f2fc22daa9859b6313a47..04ae7ffe8ce77f4e6634f1a672013d79b9d01764 100644 (file)
@@ -14,6 +14,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -22,19 +23,27 @@ import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -43,8 +52,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -53,6 +60,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -60,8 +68,12 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -83,7 +95,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
     @Override
     @After
-    public void tearDown() throws Exception {
+    public void tearDown() {
         if (leader != null) {
             leader.close();
         }
@@ -147,16 +159,18 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
+    private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
         return sendReplicate(actorContext, 1, index);
     }
 
     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
-        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                term, index, payload);
+        return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
+    }
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
+        SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
-        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
     }
 
     @Test
@@ -513,13 +527,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
         long term = actorContext.getTermInformation().getCurrentTerm();
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
+        ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
+                newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
 
         actorContext.getReplicatedLog().append(newEntry);
 
         final Identifier id = new MockIdentifier("state-id");
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new Replicate(leaderActor, id, newEntry, true));
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -532,7 +547,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 leaderActor, ApplyState.class);
         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
 
-        for (int i = 0; i <= newLogIndex - 1; i++ ) {
+        for (int i = 0; i <= newLogIndex - 1; i++) {
             ApplyState applyState = applyStateList.get(i);
             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
@@ -577,11 +592,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.markFollowerActive(FOLLOWER_ID);
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+        leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
@@ -638,8 +654,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // new entry
-        ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry =
+                new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                         new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -649,7 +665,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
         RaftActorBehavior raftBehavior = leader.handleMessage(
-                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -683,10 +699,10 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // set the snapshot as absent and check if capture-snapshot is invoked.
-        leader.setSnapshot(null);
+        leader.setSnapshotHolder(null);
 
         // new entry
-        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                 new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -694,20 +710,19 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
@@ -732,6 +747,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         actorContext.getReplicatedLog().removeFrom(0);
 
+        AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+        actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
+
         leader = new Leader(actorContext);
         actorContext.setCurrentBehavior(leader);
 
@@ -739,15 +757,15 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // set the snapshot as absent and check if capture-snapshot is invoked.
-        leader.setSnapshot(null);
+        leader.setSnapshotHolder(null);
 
         for (int i = 0; i < 4; i++) {
-            actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+            actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
                     new MockRaftActorContext.MockPayload("X" + i)));
         }
 
         // new entry
-        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
                 new MockRaftActorContext.MockPayload("D"));
 
         actorContext.getReplicatedLog().append(entry);
@@ -762,17 +780,35 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
-
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
-        // if an initiate is started again when first is in progress, it should not initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
+        assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
 
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Sending Replicate message should not initiate another capture since the first is in progress.
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+        // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+        // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
+        final byte[] bytes = new byte[]{1, 2, 3};
+        installSnapshotStream.get().get().write(bytes);
+        actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
+                Runtime.getRuntime().totalMemory());
+        MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+        // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
+        MessageCollectorActor.clearMessages(followerActor);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
     }
 
 
@@ -810,11 +846,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
-        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(),
-                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -861,11 +898,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
 
-        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(),
-                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -915,11 +953,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+        leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
         while (!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
@@ -983,11 +1022,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
@@ -1057,12 +1096,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
@@ -1122,11 +1161,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
@@ -1151,7 +1190,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testLeaderInstallSnapshotState() {
+    public void testLeaderInstallSnapshotState() throws IOException {
         logStart("testLeaderInstallSnapshotState");
 
         Map<String, String> leadersSnapshot = new HashMap<>();
@@ -1163,7 +1202,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         byte[] barray = bs.toByteArray();
 
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(barray));
 
         assertEquals(bs.size(), barray.length);
 
@@ -1187,6 +1226,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         }
 
         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+        fts.close();
     }
 
     @Override
@@ -1200,11 +1240,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Override
-    protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+    protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
         return createActorContext(LEADER_ID, actorRef);
     }
 
-    private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
+    private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
@@ -1855,7 +1895,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertTrue("Expected Leader", newBehavior instanceof Leader);
     }
 
-    private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
+    private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
 
@@ -1876,7 +1916,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
 
         // kill 1 follower and verify if that got killed
-        final JavaTestKit probe = new JavaTestKit(getSystem());
+        final TestKit probe = new TestKit(getSystem());
         probe.watch(followerActor1);
         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
@@ -1989,8 +2029,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setLastApplied(-1);
 
         String nonVotingFollowerId = "nonvoting-follower";
-        TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
-                Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+        ActorRef nonVotingFollowerActor = actorFactory.createActor(
+                MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
 
         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
                 VotingState.NON_VOTING);
@@ -2072,6 +2112,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
 
@@ -2102,6 +2143,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
@@ -2133,6 +2175,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
@@ -2194,9 +2237,148 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
     }
 
+    @Test
+    public void testReplicationWithPayloadSizeThatExceedsThreshold() {
+        logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
+
+        final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
+                Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+                        new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
+        final MockRaftActorContext.MockPayload largePayload =
+                new MockRaftActorContext.MockPayload("large", serializedSize);
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(300, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Send initial heartbeat reply so follower is marked active
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send normal payload first to prime commit index.
+        final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+        sendReplicate(leaderActorContext, term, 0);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
+        assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
+        sendReplicate(leaderActorContext, term, 1, largePayload);
+
+        MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+        assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
+        assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
+
+        final Identifier slicingId = messageSlice.getIdentifier();
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
+
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
+
+        leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
+        messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+        assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
+
+        leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send another normal payload.
+
+        sendReplicate(leaderActorContext, term, 2);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
+    }
+
+    @Test
+    public void testLargePayloadSlicingExpiration() {
+        logStart("testLargePayloadSlicingExpiration");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Send initial heartbeat reply so follower is marked active
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
+                leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+        MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+
+        // Sleep for at least 3 * election timeout so the slicing state expires.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+
+        MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send an AppendEntriesReply - this should restart the slicing.
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
+
+        MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+    }
+
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
-            ActorRef actorRef, RaftRPC rpc) throws Exception {
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+            final ActorRef actorRef, final RaftRPC rpc) {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
     }
@@ -2206,8 +2388,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
 
-        MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
-            super();
+        MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
             this.snapshotChunkSize = snapshotChunkSize;
         }