Avoid unnecessary unsuccessful AppendEntriesReply 29/78929/2
authorTom Pantelis <tompantelis@gmail.com>
Fri, 14 Dec 2018 17:36:16 +0000 (12:36 -0500)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 21 Dec 2018 10:24:38 +0000 (11:24 +0100)
In some CSIT runs I see messages such as "The prevLogIndex 10 was
found in the log but the term -1 is not equal to the append entries
prevLogTerm 2" which results in the Follower sending an unsuccessful
reply indicating it's out-of-sync. This causes the Leader to try to
catch it up.

However in the test cases, the Follower isn't actually
out-of-sync but rather it gets behind enough (the test issues 1000
txns per second per node) such that the leader re-sends AppendEntries
with log entries it already sent. When the Follower eventually receives
the first AppendEntries, it appends the entries and may do a "fake"
snapshot, ie trim the in-memory journal list and advance the snapshot
index. If so, when the duplicate AppendEntries is received, the leader's
previous log index is no longer in the Follower's journal log.
In Follower.isOutOfSync, isLogEntryPresent returns true b/c the leader's
previous index is included in the last snapshot even though it was trimmed
from the journal log. However, getLogEntryTerm returns -1 b/c it
checks if the index is in the journal log or equal to the snapshot
index, ie it doesn't take into account that the index may be in the
last snapshot. This is inconsistent with the first check and results in
the Follower reporting that it's out-of-sync when it really isnt.

I've also seen a negative result of this on the Leader side when a Follower
reports it's out-of-sync - the Follower's last log index is in the snapshot
but getLogEntryTerm returns -1 causing the Leader to take the last resort,
inefficient path of decrementing the Follower's next index in order to
catch it up.

Therefore I changed these 2 cases to also check if the index is in the snapshot
and, if so, use the snapshot term.

Change-Id: I4331d8fee85789a7004a692abb1b9c629eecd570
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit c77c163d54872612b47f0f2550931fa408ed596b)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index d3a44033a4be633979733e812bde88964a038133..86ba5ecc97aee9e7bbef8c103c08dcac6e5da36f 100644 (file)
@@ -205,7 +205,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
+        return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
index 15dbd74d01d9b2d3b3aed4b98fdb414b3943c5ae..e585066d736225565a13700a513bb6a607ffd61a 100644 (file)
@@ -229,7 +229,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
 
         long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
-        long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
         boolean updated = false;
         if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
             // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
@@ -246,9 +245,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // However in this case the log terms won't match and the logs will conflict - this is handled
             // elsewhere.
             log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
-                    + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
-                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
-                    context.getReplicatedLog().getSnapshotIndex());
+                    + "(snapshotIndex {}, snapshotTerm {}) - forcing install snaphot", logName(),
+                    followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
+                    context.getReplicatedLog().lastIndex(), context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm());
 
             followerLogInformation.setMatchIndex(-1);
             followerLogInformation.setNextIndex(-1);
@@ -257,6 +257,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             updated = true;
         } else if (appendEntriesReply.isSuccess()) {
+            long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
             if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
                     && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
                 // The follower's last entry is present in the leader's journal but the terms don't match so the
@@ -278,9 +279,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
             }
         } else {
-            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
-                    logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
+            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}, "
+                    + "snapshotTerm: {}, replicatedToAllIndex: {}", logName(), appendEntriesReply,
+                    context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(),
+                    getReplicatedToAllIndex());
 
+            long followersLastLogTermInLeadersLogOrSnapshot = getLogEntryOrSnapshotTerm(followerLastLogIndex);
             if (appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
                 // in common with this Leader and so would require a snapshot to be installed
@@ -289,12 +293,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Force initiate a snapshot capture
                 initiateCaptureSnapshot(followerId);
-            } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0
-                    && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
-                // The follower's log is empty or the last entry is present in the leader's journal
-                // and the terms match so the follower is just behind the leader's journal from
-                // the last snapshot, if any. We'll catch up the follower quickly by starting at the
-                // follower's last log index.
+            } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLogOrSnapshot >= 0
+                    && followersLastLogTermInLeadersLogOrSnapshot == appendEntriesReply.getLogLastTerm()) {
+                // The follower's log is empty or the follower's last entry is present in the leader's journal or
+                // snapshot and the terms match so the follower is just behind the leader's journal from the last
+                // snapshot, if any. We'll catch up the follower quickly by starting at the follower's last log index.
 
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
 
@@ -310,7 +313,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                     log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
                             logName(), followerId, appendEntriesReply.getLogLastTerm(),
-                            followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+                            followersLastLogTermInLeadersLogOrSnapshot, followerLogInformation.getNextIndex());
                 }
             }
         }
index e8c1b09772f172157c177c747b1bc2f7c5b51430..83ece40a1d88fd833e1c42b92efab8b55e3a4047 100644 (file)
@@ -345,7 +345,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
-     * Returns the actual term of the entry in replicated log for the given index or -1 if not found.
+     * Returns the actual term of the entry in the replicated log for the given index or -1 if not found.
      *
      * @return the log entry term or -1 if not found
      */
@@ -362,6 +362,20 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return -1;
     }
 
+    /**
+     * Returns the actual term of the entry in the replicated log for the given index or, if not present, returns the
+     * snapshot term if the given index is in the snapshot or -1 otherwise.
+     *
+     * @return the term or -1 otherwise
+     */
+    protected long getLogEntryOrSnapshotTerm(final long index) {
+        if (context.getReplicatedLog().isInSnapshot(index)) {
+            return context.getReplicatedLog().getSnapshotTerm();
+        }
+
+        return getLogEntryTerm(index);
+    }
+
     /**
      * Applies the log entries up to the specified index that is known to be committed to the state machine.
      *
index 4fa0e161db16ffc2ada57fd52cc6af22ce8160fd..2377fbf442d45099856d9ebd61f402b01588e4b7 100644 (file)
@@ -338,16 +338,18 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (lastIndex > -1) {
             if (isLogEntryPresent(appendEntries.getPrevLogIndex())) {
-                final long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
-                if (prevLogTerm != appendEntries.getPrevLogTerm()) {
+                final long leadersPrevLogTermInFollowersLogOrSnapshot =
+                        getLogEntryOrSnapshotTerm(appendEntries.getPrevLogIndex());
+                if (leadersPrevLogTermInFollowersLogOrSnapshot != appendEntries.getPrevLogTerm()) {
 
                     // The follower's log is out of sync because the Leader's prevLogIndex entry does exist
-                    // in the follower's log but it has a different term in it
+                    // in the follower's log or snapshot but it has a different term.
 
                     log.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append "
-                            + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(),
-                            appendEntries.getPrevLogIndex(), prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex,
-                            context.getReplicatedLog().getSnapshotIndex());
+                        + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        appendEntries.getPrevLogIndex(), leadersPrevLogTermInFollowersLogOrSnapshot,
+                        appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
 
                     sendOutOfSyncAppendEntriesReply(sender, false);
                     return true;
@@ -357,8 +359,8 @@ public class Follower extends AbstractRaftActorBehavior {
                 // The follower's log is out of sync because the Leader's prevLogIndex entry was not found in it's log
 
                 log.info("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, "
-                        + "snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
-                        context.getReplicatedLog().getSnapshotIndex());
+                        + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
+                        context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
                 sendOutOfSyncAppendEntriesReply(sender, false);
                 return true;
@@ -372,7 +374,9 @@ public class Follower extends AbstractRaftActorBehavior {
                 // the previous entry in it's in-memory journal
 
                 log.info("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the "
-                        + "in-memory journal", logName(), appendEntries.getReplicatedToAllIndex());
+                        + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        appendEntries.getReplicatedToAllIndex(), lastIndex,
+                        context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
                 sendOutOfSyncAppendEntriesReply(sender, false);
                 return true;
@@ -381,7 +385,9 @@ public class Follower extends AbstractRaftActorBehavior {
             final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
             if (entries.size() > 0 && !isLogEntryPresent(entries.get(0).getIndex() - 1)) {
                 log.info("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
-                        + "in-memory journal", logName(), entries.get(0).getIndex() - 1);
+                        + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
 
                 sendOutOfSyncAppendEntriesReply(sender, false);
                 return true;
index c4d97ea0e56a4f6259e7cb48f2f5eac1577795de..f1d7f9ea45483e366b44702e67c07bd360da87a3 100644 (file)
@@ -259,7 +259,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
     @Test
     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
-        logStart("testHandleFirstAppendEntries");
+        logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
 
         MockRaftActorContext context = createActorContext();
         context.getReplicatedLog().clear(0,2);
@@ -285,7 +285,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
     @Test
     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
-        logStart("testHandleFirstAppendEntries");
+        logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
 
         MockRaftActorContext context = createActorContext();
         context.getReplicatedLog().clear(0,2);
@@ -511,7 +511,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     /**
-     * This test verifies that when an AppendEntries is received a specific prevLogTerm
+     * This test verifies that when an AppendEntries is received with a prevLogTerm
      * which does not match the term that is in RaftActors log entry at prevLogIndex
      * then the RaftActor does not change it's state and it returns a failure.
      */
@@ -521,13 +521,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         MockRaftActorContext context = createActorContext();
 
-        // First set the receivers term to lower number
-        context.getTermInformation().update(95, "test");
-
-        // AppendEntries is now sent with a bigger term
-        // this will set the receivers term to be the same as the sender's term
-        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
-                (short)0);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -541,6 +535,28 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("isSuccess", false, reply.isSuccess());
     }
 
+    @Test
+    public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
+        logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
+        context.getReplicatedLog().setSnapshotIndex(4);
+        context.getReplicatedLog().setSnapshotTerm(3);
+
+        AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
+
+        follower = createBehavior(context);
+
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+        Assert.assertSame(follower, newBehavior);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertEquals("isSuccess", true, reply.isSuccess());
+    }
+
     /**
      * This test verifies that when a new AppendEntries message is received with
      * new entries and the logs of the sender and receiver match that the new