Bug 2787: Batch AppendEntries to speed up follower sync
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index bdfdd9b3765c576495e5bbf96dcdb19958c73cd5..2eb3b32c6f13d0c01db85fe7fa141a1eb6c2aac9 100644 (file)
@@ -178,26 +178,33 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
 
         followerLogInformation.markFollowerActive();
+        followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
 
         boolean updated = false;
         if (appendEntriesReply.isSuccess()) {
-            updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
-            updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
-
-            if(updated && LOG.isDebugEnabled()) {
-                LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(),
-                        followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
-            }
+            updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
         } else {
             LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
 
-            // TODO: When we find that the follower is out of sync with the
-            // Leader we simply decrement that followers next index by 1.
-            // Would it be possible to do better than this? The RAFT spec
-            // does not explicitly deal with it but may be something for us to
-            // think about
+            long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+            ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
+            if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
+                    followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
+                // The follower's log is empty or the last entry is present in the leader's journal
+                // and the terms match so the follower is just behind the leader's journal from
+                // the last snapshot, if any. We'll catch up the follower quickly by starting at the
+                // follower's last log index.
 
-            followerLogInformation.decrNextIndex();
+                updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+            } else {
+                // TODO: When we find that the follower is out of sync with the
+                // Leader we simply decrement that followers next index by 1.
+                // Would it be possible to do better than this? The RAFT spec
+                // does not explicitly deal with it but may be something for us to
+                // think about.
+
+                followerLogInformation.decrNextIndex();
+            }
         }
 
         // Now figure out if this reply warrants a change in the commitIndex
@@ -243,6 +250,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return this;
     }
 
+    private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
+            AppendEntriesReply appendEntriesReply) {
+        boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
+        updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
+
+        if(updated && LOG.isDebugEnabled()) {
+            LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
+                    logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
+                    followerLogInformation.getNextIndex());
+        }
+        return updated;
+    }
+
     private void purgeInMemoryLog() {
         //find the lowest index across followers which has been replicated to all.
         // lastApplied if there are no followers, so that we keep clearing the log for single-node
@@ -475,8 +495,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
 
                 if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
-                    LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
-                            logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+                    LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+                            logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
                 }
 
                 if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
@@ -484,9 +504,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
                             followerNextIndex, followerId);
 
-                    // FIXME : Sending one entry at a time
                     if(followerLogInformation.okToReplicate()) {
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+                        // Try to send all the entries in the journal but not exceeding the max data size
+                        // for a single AppendEntries message.
+                        int maxEntries = (int) context.getReplicatedLog().size();
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
+                                context.getConfigParams().getSnapshotChunkSize());
                         sendAppendEntries = true;
                     }
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
@@ -526,7 +549,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             prevLogIndex(followerNextIndex),
             prevLogTerm(followerNextIndex), entries,
-            context.getCommitIndex(), super.getReplicatedToAllIndex());
+            context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
 
         if(!entries.isEmpty() || LOG.isTraceEnabled()) {
             LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,