Avoid unnecessary unsuccessful AppendEntriesReply 05/78805/4
authorTom Pantelis <tompantelis@gmail.com>
Fri, 14 Dec 2018 17:36:16 +0000 (12:36 -0500)
committerStephen Kitt <skitt@redhat.com>
Tue, 18 Dec 2018 17:09:53 +0000 (17:09 +0000)
In some CSIT runs I see messages such as "The prevLogIndex 10 was
found in the log but the term -1 is not equal to the append entries
prevLogTerm 2" which results in the Follower sending an unsuccessful
reply indicating it's out-of-sync. This causes the Leader to try to
catch it up.

However in the test cases, the Follower isn't actually
out-of-sync but rather it gets behind enough (the test issues 1000
txns per second per node) such that the leader re-sends AppendEntries
with log entries it already sent. When the Follower eventually receives
the first AppendEntries, it appends the entries and may do a "fake"
snapshot, ie trim the in-memory journal list and advance the snapshot
index. If so, when the duplicate AppendEntries is received, the leader's
previous log index is no longer in the Follower's journal log.
In Follower.isOutOfSync, isLogEntryPresent returns true b/c the leader's
previous index is included in the last snapshot even though it was trimmed
from the journal log. However, getLogEntryTerm returns -1 b/c it
checks if the index is in the journal log or equal to the snapshot
index, ie it doesn't take into account that the index may be in the
last snapshot. This is inconsistent with the first check and results in
the Follower reporting that it's out-of-sync when it really isnt.

I've also seen a negative result of this on the Leader side when a Follower
reports it's out-of-sync - the Follower's last log index is in the snapshot
but getLogEntryTerm returns -1 causing the Leader to take the last resort,
inefficient path of decrementing the Follower's next index in order to
catch it up.

Therefore I changed these 2 cases to also check if the index is in the snapshot
and, if so, use the snapshot term.

Change-Id: I4331d8fee85789a7004a692abb1b9c629eecd570
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index d3a4403..86ba5ec 100644 (file)
@@ -205,7 +205,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
+        return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
index 15dbd74..e585066 100644 (file)
@@ -229,7 +229,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
 
         long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
-        long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
         boolean updated = false;
         if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
             // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
@@ -246,9 +245,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // However in this case the log terms won't match and the logs will conflict - this is handled
             // elsewhere.
             log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
-                    + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
-                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
-                    context.getReplicatedLog().getSnapshotIndex());
+                    + "(snapshotIndex {}, snapshotTerm {}) - forcing install snaphot", logName(),
+                    followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
+                    context.getReplicatedLog().lastIndex(), context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm());
 
             followerLogInformation.setMatchIndex(-1);
             followerLogInformation.setNextIndex(-1);
@@ -257,6 +257,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             updated = true;
         } else if (appendEntriesReply.isSuccess()) {
+            long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
             if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
                     && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
                 // The follower's last entry is present in the leader's journal but the terms don't match so the
@@ -278,9 +279,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
             }
         } else {
-            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
-                    logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
+            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}, "
+                    + "snapshotTerm: {}, replicatedToAllIndex: {}", logName(), appendEntriesReply,
+                    context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(),
+                    getReplicatedToAllIndex());
 
+            long followersLastLogTermInLeadersLogOrSnapshot = getLogEntryOrSnapshotTerm(followerLastLogIndex);
             if (appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
                 // in common with this Leader and so would require a snapshot to be installed
@@ -289,12 +293,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Force initiate a snapshot capture
                 initiateCaptureSnapshot(followerId);
-            } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0
-                    && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
-                // The follower's log is empty or the last entry is present in the leader's journal
-                // and the terms match so the follower is just behind the leader's journal from
-                // the last snapshot, if any. We'll catch up the follower quickly by starting at the
-                // follower's last log index.
+            } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLogOrSnapshot >= 0
+                    && followersLastLogTermInLeadersLogOrSnapshot == appendEntriesReply.getLogLastTerm()) {
+                // The follower's log is empty or the follower's last entry is present in the leader's journal or
+                // snapshot and the terms match so the follower is just behind the leader's journal from the last
+                // snapshot, if any. We'll catch up the follower quickly by starting at the follower's last log index.
 
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
 
@@ -310,7 +313,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                     log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
                             logName(), followerId, appendEntriesReply.getLogLastTerm(),
-                            followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+                            followersLastLogTermInLeadersLogOrSnapshot, followerLogInformation.getNextIndex());
                 }
             }
         }
index 400f110..b63a7cc 100644 (file)
@@ -347,7 +347,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
-     * Returns the actual term of the entry in replicated log for the given index or -1 if not found.
+     * Returns the actual term of the entry in the replicated log for the given index or -1 if not found.
      *
      * @return the log entry term or -1 if not found
      */
@@ -364,6 +364,20 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return -1;
     }
 
+    /**
+     * Returns the actual term of the entry in the replicated log for the given index or, if not present, returns the
+     * snapshot term if the given index is in the snapshot or -1 otherwise.
+     *
+     * @return the term or -1 otherwise
+     */
+    protected long getLogEntryOrSnapshotTerm(final long index) {
+        if (context.getReplicatedLog().isInSnapshot(index)) {
+            return context.getReplicatedLog().getSnapshotTerm();
+        }
+
+        return getLogEntryTerm(index);
+    }
+
     /**
      * Applies the log entries up to the specified index that is known to be committed to the state machine.
      *
index 4fa0e16..2377fbf 100644 (file)
@@ -338,16 +338,18 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (lastIndex > -1) {
             if (isLogEntryPresent(appendEntries.getPrevLogIndex())) {
-                final long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
-                if (prevLogTerm != appendEntries.getPrevLogTerm()) {
+                final long leadersPrevLogTermInFollowersLogOrSnapshot =
+                        getLogEntryOrSnapshotTerm(appendEntries.getPrevLogIndex());
+                if (leadersPrevLogTermInFollowersLogOrSnapshot != appendEntries.getPrevLogTerm()) {
 
                     // The follower's log is out of sync because the Leader's prevLogIndex entry does exist
-                    // in the follower's log but it has a different term in it
+                    // in the follower's log or snapshot but it has a different term.
 
                     log.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append "
-                            + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(),
-                            appendEntries.getPrevLogIndex(), prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex,
-                            context.getReplicatedLog().getSnapshotIndex());
+                        + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        appendEntries.getPrevLogIndex(), leadersPrevLogTermInFollowersLogOrSnapshot,
+                        appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
 
                     sendOutOfSyncAppendEntriesReply(sender, false);
                     return true;
@@ -357,8 +359,8 @@ public class Follower extends AbstractRaftActorBehavior {
                 // The follower's log is out of sync because the Leader's prevLogIndex entry was not found in it's log
 
                 log.info("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, "
-                        + "snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
-                        context.getReplicatedLog().getSnapshotIndex());
+                        + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
+                        context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
                 sendOutOfSyncAppendEntriesReply(sender, false);
                 return true;
@@ -372,7 +374,9 @@ public class Follower extends AbstractRaftActorBehavior {
                 // the previous entry in it's in-memory journal
 
                 log.info("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the "
-                        + "in-memory journal", logName(), appendEntries.getReplicatedToAllIndex());
+                        + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        appendEntries.getReplicatedToAllIndex(), lastIndex,
+                        context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
                 sendOutOfSyncAppendEntriesReply(sender, false);
                 return true;
@@ -381,7 +385,9 @@ public class Follower extends AbstractRaftActorBehavior {
             final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
             if (entries.size() > 0 && !isLogEntryPresent(entries.get(0).getIndex() - 1)) {
                 log.info("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
-                        + "in-memory journal", logName(), entries.get(0).getIndex() - 1);
+                        + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+                        entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
 
                 sendOutOfSyncAppendEntriesReply(sender, false);
                 return true;
index 39909f2..4dd2b9b 100644 (file)
@@ -14,7 +14,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -259,7 +259,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
     @Test
     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
-        logStart("testHandleFirstAppendEntries");
+        logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
 
         MockRaftActorContext context = createActorContext();
         context.getReplicatedLog().clear(0,2);
@@ -285,7 +285,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
     @Test
     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
-        logStart("testHandleFirstAppendEntries");
+        logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
 
         MockRaftActorContext context = createActorContext();
         context.getReplicatedLog().clear(0,2);
@@ -511,7 +511,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     /**
-     * This test verifies that when an AppendEntries is received a specific prevLogTerm
+     * This test verifies that when an AppendEntries is received with a prevLogTerm
      * which does not match the term that is in RaftActors log entry at prevLogIndex
      * then the RaftActor does not change it's state and it returns a failure.
      */
@@ -521,13 +521,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         MockRaftActorContext context = createActorContext();
 
-        // First set the receivers term to lower number
-        context.getTermInformation().update(95, "test");
-
-        // AppendEntries is now sent with a bigger term
-        // this will set the receivers term to be the same as the sender's term
-        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
-                (short)0);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -541,6 +535,28 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("isSuccess", false, reply.isSuccess());
     }
 
+    @Test
+    public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
+        logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
+        context.getReplicatedLog().setSnapshotIndex(4);
+        context.getReplicatedLog().setSnapshotTerm(3);
+
+        AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
+
+        follower = createBehavior(context);
+
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+        Assert.assertSame(follower, newBehavior);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertEquals("isSuccess", true, reply.isSuccess());
+    }
+
     /**
      * This test verifies that when a new AppendEntries message is received with
      * new entries and the logs of the sender and receiver match that the new

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.