Bug 2787: Batch AppendEntries to speed up follower sync 32/22032/1
authorTom Pantelis <tpanteli@brocade.com>
Tue, 2 Jun 2015 06:55:15 +0000 (02:55 -0400)
committerMoiz Raja <moraja@cisco.com>
Sat, 6 Jun 2015 18:38:41 +0000 (18:38 +0000)
AbstractLeader#sendUpdatesToFollower now attempts to send all entries
to the follower to catch it up with 1 AppendEntries message. However, we
don't want to send too large a message and risk exceeding akka's message
size limit. So I added another param, maxDataSize, to
ReplicatedLog#getFrom. It will attempt to add all entries up to
maxEntries but stops if the accumulated data size exceeds maxDataSize.

For maxDataSize, I reused the existing snapshotChunkSize (default 2M)
defined in ConfigParams. This currently is hard-coded - we may want to
make this configurable.

Change-Id: Ib2c1165c4140a36f4eada8f81b4261260615dd18
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 6065ba82c90e366919a1b78105507b935b91af8e)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index b4b2afbc4ad602ccd1bc9f50da8641cd0b05f605..c245206f641f3a4ff31da8608076f1c3d68cb4f6 100644 (file)
@@ -124,20 +124,43 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
-        return getFrom(logEntryIndex, journal.size());
+        return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
     }
 
     @Override
-    public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
+    public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         int size = journal.size();
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
-            int maxIndex = adjustedIndex + max;
+            int maxIndex = adjustedIndex + maxEntries;
             if(maxIndex > size){
                 maxIndex = size;
             }
-            return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+
+            if(maxDataSize == NO_MAX_SIZE) {
+                return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+            } else {
+                List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
+                long totalSize = 0;
+                for(int i = adjustedIndex; i < maxIndex; i++) {
+                    ReplicatedLogEntry entry = journal.get(i);
+                    totalSize += entry.size();
+                    if(totalSize <= maxDataSize) {
+                        retList.add(entry);
+                    } else {
+                        if(retList.isEmpty()) {
+                            // Edge case - the first entry's size exceeds the threshold. We need to return
+                            // at least the first entry so add it here.
+                            retList.add(entry);
+                        }
+
+                        break;
+                    }
+                }
+
+                return retList;
+            }
         } else {
             return Collections.emptyList();
         }
index d4d13899eb770e4ee8bb88a634dd85ec2cfb7bbc..1b87f44f846ed3238aae637c905062183d85e07a 100644 (file)
@@ -49,6 +49,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     // in-memory journal can use before it needs to snapshot
     private int snapshotDataThresholdPercentage = 12;
 
+    private int snaphotChunkSize = SNAPSHOT_CHUNK_SIZE;
+
     private long electionTimeoutFactor = 2;
 
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
@@ -64,6 +66,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
     }
 
+    public void setSnaphotChunkSize(int snaphotChunkSize) {
+        this.snaphotChunkSize = snaphotChunkSize;
+    }
+
     public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
@@ -109,7 +115,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     @Override
     public int getSnapshotChunkSize() {
-        return SNAPSHOT_CHUNK_SIZE;
+        return snaphotChunkSize;
     }
 
     @Override
index 0ad1df3c33bb6433e5ddf0afca46f5f4920d2882..9e99be118451523153871652b964d7824931c1ad 100644 (file)
@@ -10,67 +10,61 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.japi.Procedure;
 import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 /**
  * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor
  */
 public interface ReplicatedLog {
+    long NO_MAX_SIZE = -1;
+
     /**
-     * Get a replicated log entry at the specified index
+     * Return the replicated log entry at the specified index.
      *
      * @param index the index of the log entry
-     * @return the ReplicatedLogEntry at index. null if index less than 0 or
+     * @return the ReplicatedLogEntry if found, otherwise null if the adjusted index less than 0 or
      * greater than the size of the in-memory journal.
      */
-    ReplicatedLogEntry get(long index);
-
+    @Nullable ReplicatedLogEntry get(long index);
 
     /**
-     * Get the last replicated log entry
-     *
-     * @return
+     * Return the last replicated log entry in the log or null of not found.
      */
-    ReplicatedLogEntry last();
+    @Nullable ReplicatedLogEntry last();
 
     /**
-     *
-     * @return
+     * Return the index of the last entry in the log or -1 if the log is empty.
      */
     long lastIndex();
 
     /**
-     *
-     * @return
+     * Return the term of the last entry in the log or -1 if the log is empty.
      */
     long lastTerm();
 
     /**
-     * To be called when we need to remove entries from the in-memory log.
-     * This method will remove all entries >= index. This method should be used
-     * during recovery to appropriately trim the log based on persisted
-     * information
+     * Removes entries from the in-memory log starting at the given index.
      *
-     * @param index the index of the log entry
-     * @return the adjusted index of the first log entry removed or -1 if log entry not found.
+     * @param index the index of the first log entry to remove
+     * @return the adjusted index of the first log entry removed or -1 if the log entry is not found.
      */
     long removeFrom(long index);
 
-
     /**
-     * To be called when we need to remove entries from the in-memory log and we
-     * need that information persisted to disk. This method will remove all
-     * entries >= index.
+     * Removes entries from the in-memory log a nd the persisted log starting at the given index.
      * <p>
      * The persisted information would then be used during recovery to properly
      * reconstruct the state of the in-memory replicated log
      *
-     * @param index the index of the log entry
+     * @param the index of the first log entry to remove
      */
     void removeFromAndPersist(long index);
 
     /**
-     * Append an entry to the log
-     * @param replicatedLogEntry
+     * Appends an entry to the log.
+     *
+     * @param replicatedLogEntry the entry to append
      */
     void append(ReplicatedLogEntry replicatedLogEntry);
 
@@ -82,24 +76,32 @@ public interface ReplicatedLog {
     void increaseJournalLogCapacity(int amount);
 
     /**
+     * Appends an entry to the in-memory log and persists it as well.
      *
-     * @param replicatedLogEntry
+     * @param replicatedLogEntry the entry to append
      */
     void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
 
     void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
 
     /**
+     * Returns a list of log entries starting from the given index to the end of the log.
      *
-     * @param index the index of the log entry
+     * @param index the index of the first log entry to get.
+     * @return the List of entries
      */
-    List<ReplicatedLogEntry> getFrom(long index);
+    @Nonnull List<ReplicatedLogEntry> getFrom(long index);
 
     /**
+     * Returns a list of log entries starting from the given index up to the given maximum of entries or
+     * the given maximum accumulated size, whichever comes first.
      *
-     * @param index the index of the log entry
+     * @param index the index of the first log entry to get
+     * @param maxEntries the maximum number of entries to get
+     * @param maxDataSize the maximum accumulated size of the log entries to get
+     * @return the List of entries meeting the criteria.
      */
-    List<ReplicatedLogEntry> getFrom(long index, int max);
+    @Nonnull List<ReplicatedLogEntry> getFrom(long index, int maxEntries, long maxDataSize);
 
     /**
      *
index 8fb66998c71abc8cb45657928822d580f09ab753..2eb3b32c6f13d0c01db85fe7fa141a1eb6c2aac9 100644 (file)
@@ -504,9 +504,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
                             followerNextIndex, followerId);
 
-                    // FIXME : Sending one entry at a time
                     if(followerLogInformation.okToReplicate()) {
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+                        // Try to send all the entries in the journal but not exceeding the max data size
+                        // for a single AppendEntries message.
+                        int maxEntries = (int) context.getReplicatedLog().size();
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
+                                context.getConfigParams().getSnapshotChunkSize());
                         sendAppendEntries = true;
                     }
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
index afec6a5d3ddb7c61c7effd7d67bdb9aa32cc2026..a3e4396f37a95f698c8bc660fc558dbd443fd38a 100644 (file)
@@ -55,7 +55,7 @@ public class AbstractReplicatedLogImplTest {
         Assert.assertNull("get(0)", replicatedLogImpl.get(0));
         Assert.assertNull("last", replicatedLogImpl.last());
 
-        List<ReplicatedLogEntry> list = replicatedLogImpl.getFrom(0, 1);
+        List<ReplicatedLogEntry> list = replicatedLogImpl.getFrom(0, 1, ReplicatedLog.NO_MAX_SIZE);
         assertEquals("getFrom size", 0, list.size());
 
         assertEquals("removeFrom", -1, replicatedLogImpl.removeFrom(1));
@@ -136,19 +136,41 @@ public class AbstractReplicatedLogImplTest {
     }
 
     @Test
-    public void testGetFromWithMax(){
-        List<ReplicatedLogEntry> from = replicatedLogImpl.getFrom(0, 1);
+    public void testGetFromWithMax() {
+        List<ReplicatedLogEntry> from = replicatedLogImpl.getFrom(0, 1, ReplicatedLog.NO_MAX_SIZE);
         Assert.assertEquals(1, from.size());
-        Assert.assertEquals(1, from.get(0).getTerm());
+        Assert.assertEquals("A", from.get(0).getData().toString());
 
-        from = replicatedLogImpl.getFrom(0, 20);
+        from = replicatedLogImpl.getFrom(0, 20, ReplicatedLog.NO_MAX_SIZE);
         Assert.assertEquals(4, from.size());
-        Assert.assertEquals(2, from.get(3).getTerm());
+        Assert.assertEquals("A", from.get(0).getData().toString());
+        Assert.assertEquals("D", from.get(3).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 2);
+        from = replicatedLogImpl.getFrom(1, 2, ReplicatedLog.NO_MAX_SIZE);
         Assert.assertEquals(2, from.size());
-        Assert.assertEquals(1, from.get(1).getTerm());
+        Assert.assertEquals("B", from.get(0).getData().toString());
+        Assert.assertEquals("C", from.get(1).getData().toString());
 
+        from = replicatedLogImpl.getFrom(1, 3, 2);
+        Assert.assertEquals(2, from.size());
+        Assert.assertEquals("B", from.get(0).getData().toString());
+        Assert.assertEquals("C", from.get(1).getData().toString());
+
+        from = replicatedLogImpl.getFrom(1, 3, 3);
+        Assert.assertEquals(3, from.size());
+        Assert.assertEquals("B", from.get(0).getData().toString());
+        Assert.assertEquals("C", from.get(1).getData().toString());
+        Assert.assertEquals("D", from.get(2).getData().toString());
+
+        from = replicatedLogImpl.getFrom(1, 2, 3);
+        Assert.assertEquals(2, from.size());
+        Assert.assertEquals("B", from.get(0).getData().toString());
+        Assert.assertEquals("C", from.get(1).getData().toString());
+
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("12345")));
+        from = replicatedLogImpl.getFrom(4, 2, 2);
+        Assert.assertEquals(1, from.size());
+        Assert.assertEquals("12345", from.get(0).getData().toString());
     }
 
     @Test
index e19b5216103757bafada5d9659c913c9de79c3e5..31a12d5659dcef801dce2bfa14357d9f53e58838 100644 (file)
@@ -435,7 +435,7 @@ public class MockRaftActorContext implements RaftActorContext {
 
         public  MockReplicatedLogBuilder createEntries(int start, int end, int term) {
             for (int i=start; i<end; i++) {
-                this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload("foo" + i)));
+                this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload(Integer.toString(i))));
             }
             return this;
         }
index 33e5c4bcdf4e324ebd842c651fdbb99e8ad46f59..315583fa381506d191db535c06fc23924d1f500b 100644 (file)
@@ -68,7 +68,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         // Now deliver the AppendEntries to the follower
         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
 
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
 
         // Now deliver the CaptureSnapshotReply to the leader.
         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
@@ -116,7 +116,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         // Now deliver the AppendEntries to the follower
         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
 
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
 
         reinstateLeaderActor();
 
@@ -146,6 +146,9 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
 
         payload0 = sendPayloadData(leaderActor, "zero");
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+
         payload1 = sendPayloadData(leaderActor, "one");
 
         // Verify the leader applies the states.
index 27854f40e957a7a395fcd3e8c0cd5df871b3e00e..ccde8bfb226fb4cb3b3dcbaa333139de1eb00524 100644 (file)
@@ -156,11 +156,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         followerActor.underlyingActor().clear();
 
-        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                1, lastIndex + 1, payload);
-        actorContext.getReplicatedLog().append(newEntry);
-        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -171,7 +167,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
-        assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
     }
 
     @Test
@@ -1124,12 +1120,15 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
         followerActorContext.setReplicatedLog(
                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
-        followerActorContext.setCommitIndex(1);
-        followerActorContext.setLastApplied(1);
+        followerActorContext.setCommitIndex(0);
+        followerActorContext.setLastApplied(0);
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
@@ -1151,26 +1150,37 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
-        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
-        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        // Verify AppendEntries sent with the leader's second log entry.
-        appendEntries = appendEntriesList.get(0);
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
-        // Verify AppendEntries sent with the leader's third log entry.
-        appendEntries = appendEntriesList.get(1);
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersThirdLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
 
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
     }
@@ -1189,6 +1199,9 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
+        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
@@ -1215,26 +1228,37 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
-        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
-        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        // Verify AppendEntries sent with the leader's first log entry.
-        appendEntries = appendEntriesList.get(0);
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
-        // Verify AppendEntries sent with the leader's second log entry.
-        appendEntries = appendEntriesList.get(1);
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
 
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
     }
@@ -1283,22 +1307,21 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
-        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
-        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        // Verify AppendEntries sent with the leader's first log entry.
-        appendEntries = appendEntriesList.get(0);
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
-        // Verify AppendEntries sent with the leader's third log entry.
-        appendEntries = appendEntriesList.get(1);
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
@@ -1381,6 +1404,88 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(RaftState.Leader, raftActorBehavior.state());
     }
 
+    @Test
+    public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
+        logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
+        long leaderCommitIndex = 3;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+        ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+
+        appendEntries = appendEntriesList.get(0);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        appendEntries = appendEntriesList.get(1);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersThirdLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersFourthLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
+
+        MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
+
+        assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
+    }
+
     @Test
     public void testHandleRequestVoteReply(){
         logStart("testHandleRequestVoteReply");
@@ -1463,83 +1568,6 @@ public class LeaderTest extends AbstractLeaderTest {
         }};
     }
 
-
-    @Test
-    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
-        logStart("testAppendEntryCallAtEndofAppendEntryReply");
-
-        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
-
-        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-        //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
-        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
-
-        leaderActorContext.setConfigParams(configParams);
-
-        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
-
-        followerActorContext.setConfigParams(configParams);
-        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
-
-        Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().setBehavior(follower);
-
-        leaderActorContext.getReplicatedLog().removeFrom(0);
-        leaderActorContext.setCommitIndex(-1);
-        leaderActorContext.setLastApplied(-1);
-
-        followerActorContext.getReplicatedLog().removeFrom(0);
-        followerActorContext.setCommitIndex(-1);
-        followerActorContext.setLastApplied(-1);
-
-        leader = new Leader(leaderActorContext);
-
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
-                leaderActor, AppendEntriesReply.class);
-
-        leader.handleMessage(followerActor, appendEntriesReply);
-
-        // Clear initial heartbeat messages
-
-        leaderActor.underlyingActor().clear();
-        followerActor.underlyingActor().clear();
-
-        // create 3 entries
-        leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-        leaderActorContext.setCommitIndex(1);
-        leaderActorContext.setLastApplied(1);
-
-        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
-
-        leader.handleMessage(leaderActor, new SendHeartBeat());
-
-        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
-        // Should send first log entry
-        assertEquals(1, appendEntries.getLeaderCommit());
-        assertEquals(0, appendEntries.getEntries().get(0).getIndex());
-        assertEquals(-1, appendEntries.getPrevLogIndex());
-
-        appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
-
-        assertEquals(1, appendEntriesReply.getLogLastTerm());
-        assertEquals(0, appendEntriesReply.getLogLastIndex());
-
-        followerActor.underlyingActor().clear();
-
-        leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
-
-        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
-        // Should send second log entry
-        assertEquals(1, appendEntries.getLeaderCommit());
-        assertEquals(1, appendEntries.getEntries().get(0).getIndex());
-
-        follower.close();
-    }
-
     @Test
     public void testLaggingFollowerStarvation() throws Exception {
         logStart("testLaggingFollowerStarvation");