Bug 2787: Batch AppendEntries to speed up follower sync
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 3f085df8dc3b858879981c27e223262f2a5bc40f..ccde8bfb226fb4cb3b3dcbaa333139de1eb00524 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
@@ -48,6 +49,7 @@ import scala.concurrent.duration.FiniteDuration;
 public class LeaderTest extends AbstractLeaderTest {
 
     static final String FOLLOWER_ID = "follower";
+    public static final String LEADER_ID = "leader";
 
     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
@@ -84,6 +86,8 @@ public class LeaderTest extends AbstractLeaderTest {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        short payloadVersion = (short)5;
+        actorContext.setPayloadVersion(payloadVersion);
 
         long term = 1;
         actorContext.getTermInformation().update(term, "");
@@ -97,10 +101,11 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
         assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
 
         // The follower would normally reply - simulate that explicitly here.
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex - 1, term));
+                FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -117,6 +122,16 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
+    }
+
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                1, index, payload);
+        actorContext.getReplicatedLog().append(newEntry);
+        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
     }
 
     @Test
@@ -136,17 +151,12 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
 
-        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                1, lastIndex + 1, payload);
-        actorContext.getReplicatedLog().append(newEntry);
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(null, null, newEntry));
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -157,9 +167,221 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
-        assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
     }
 
+    @Test
+    public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+        logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<5;i++) {
+            sendReplicate(actorContext, lastIndex+i+1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect only 1 message to be sent because of two reasons,
+        // - an append entries reply was not received
+        // - the heartbeat interval has not expired
+        // In this scenario if multiple messages are sent they would likely be duplicates
+        assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+    }
+
+    @Test
+    public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+        logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<3;i++) {
+            sendReplicate(actorContext, lastIndex+i+1);
+            leader.handleMessage(followerActor, new AppendEntriesReply(
+                    FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
+
+        }
+
+        for(int i=3;i<5;i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+        // get sent to the follower - but not the 5th
+        assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+
+        for(int i=0;i<4;i++) {
+            long expected = allMessages.get(i).getEntries().get(0).getIndex();
+            assertEquals(expected, i+2);
+        }
+    }
+
+    @Test
+    public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+        logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        sendReplicate(actorContext, lastIndex+1);
+
+        // Wait slightly longer than heartbeat duration
+        Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(1, allMessages.get(0).getEntries().size());
+        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+
+    }
+
+    @Test
+    public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+        logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<3;i++) {
+            Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+    }
+
+    @Test
+    public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+        logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+        sendReplicate(actorContext, lastIndex+1);
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(0, allMessages.get(0).getEntries().size());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+    }
+
+
     @Test
     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
@@ -304,6 +526,8 @@ public class LeaderTest extends AbstractLeaderTest {
                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
                         new MockRaftActorContext.MockPayload("D"));
 
+        actorContext.getReplicatedLog().append(entry);
+
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
@@ -313,7 +537,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
     }
 
     @Test
@@ -361,7 +585,9 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+        CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
         assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
@@ -372,8 +598,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
-        assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
     @Test
@@ -726,7 +951,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
     @Override
     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
-        return createActorContext("leader", actorRef);
+        return createActorContext(LEADER_ID, actorRef);
     }
 
     private MockRaftActorContext createActorContextWithFollower() {
@@ -745,6 +970,15 @@ public class LeaderTest extends AbstractLeaderTest {
         return context;
     }
 
+    private MockRaftActorContext createFollowerActorContextWithLeader() {
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+        DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
+        followerConfig.setElectionTimeoutFactor(10000);
+        followerActorContext.setConfigParams(followerConfig);
+        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+        return followerActorContext;
+    }
+
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
@@ -805,14 +1039,15 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext leaderActorContext = createActorContext();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
 
-        Map<String, String> peerAddresses = new HashMap<>();
-        peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+        Map<String, String> leaderPeerAddresses = new HashMap<>();
+        leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
 
-        leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.setPeerAddresses(leaderPeerAddresses);
 
         leaderActorContext.getReplicatedLog().removeFrom(0);
 
@@ -872,26 +1107,242 @@ public class LeaderTest extends AbstractLeaderTest {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyFailure(){
-        logStart("testHandleAppendEntriesReplyFailure");
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        long leaderCommitIndex = 2;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        followerActorContext.setCommitIndex(0);
+        followerActorContext.setLastApplied(0);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersThirdLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
+
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+        long leaderCommitIndex = 1;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
-        // Send initial heartbeat reply with last index.
-        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
-        assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
+        assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
 
-        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+    }
 
-        assertEquals(RaftState.Leader, raftActorBehavior.state());
+    @Test
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+        long leaderCommitIndex = 1;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
 
-        assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
     }
 
     @Test
@@ -909,7 +1360,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
+        short payloadVersion = 5;
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -932,6 +1384,9 @@ public class LeaderTest extends AbstractLeaderTest {
         ApplyState applyState = applyStateList.get(0);
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals(payloadVersion, followerInfo.getPayloadVersion());
     }
 
     @Test
@@ -942,13 +1397,95 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
-        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
+        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
         assertEquals(RaftState.Leader, raftActorBehavior.state());
     }
 
+    @Test
+    public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
+        logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
+        long leaderCommitIndex = 3;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+        ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+
+        appendEntries = appendEntriesList.get(0);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        appendEntries = appendEntriesList.get(1);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersThirdLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersFourthLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
+
+        MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
+
+        assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
+    }
+
     @Test
     public void testHandleRequestVoteReply(){
         logStart("testHandleRequestVoteReply");
@@ -1031,82 +1568,6 @@ public class LeaderTest extends AbstractLeaderTest {
         }};
     }
 
-
-    @Test
-    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
-        logStart("testAppendEntryCallAtEndofAppendEntryReply");
-
-        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
-
-        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-        //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
-        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
-
-        leaderActorContext.setConfigParams(configParams);
-
-        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
-
-        followerActorContext.setConfigParams(configParams);
-
-        Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().setBehavior(follower);
-
-        leaderActorContext.getReplicatedLog().removeFrom(0);
-        leaderActorContext.setCommitIndex(-1);
-        leaderActorContext.setLastApplied(-1);
-
-        followerActorContext.getReplicatedLog().removeFrom(0);
-        followerActorContext.setCommitIndex(-1);
-        followerActorContext.setLastApplied(-1);
-
-        leader = new Leader(leaderActorContext);
-
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
-                leaderActor, AppendEntriesReply.class);
-
-        leader.handleMessage(followerActor, appendEntriesReply);
-
-        // Clear initial heartbeat messages
-
-        leaderActor.underlyingActor().clear();
-        followerActor.underlyingActor().clear();
-
-        // create 3 entries
-        leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-        leaderActorContext.setCommitIndex(1);
-        leaderActorContext.setLastApplied(1);
-
-        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
-
-        leader.handleMessage(leaderActor, new SendHeartBeat());
-
-        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
-        // Should send first log entry
-        assertEquals(1, appendEntries.getLeaderCommit());
-        assertEquals(0, appendEntries.getEntries().get(0).getIndex());
-        assertEquals(-1, appendEntries.getPrevLogIndex());
-
-        appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
-
-        assertEquals(1, appendEntriesReply.getLogLastTerm());
-        assertEquals(0, appendEntriesReply.getLogLastIndex());
-
-        followerActor.underlyingActor().clear();
-
-        leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
-
-        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
-        // Should send second log entry
-        assertEquals(1, appendEntries.getLeaderCommit());
-        assertEquals(1, appendEntries.getEntries().get(0).getIndex());
-
-        follower.close();
-    }
-
     @Test
     public void testLaggingFollowerStarvation() throws Exception {
         logStart("testLaggingFollowerStarvation");
@@ -1147,7 +1608,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
             for(int i=1;i<6;i++) {
                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
-                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
                 assertTrue(newBehavior == leader);
                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
             }