Fix AppendEntry logic when prevLogIndex and prevLogTerm is -1 46/24546/2
authorMoiz Raja <moraja@cisco.com>
Mon, 13 Jul 2015 20:31:42 +0000 (13:31 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 25 Jul 2015 01:22:00 +0000 (01:22 +0000)
When an AppendEntry arrives at a Follower with the prevLogIndex and
prevLogTerm = -1 the Follower will accept that append entry and add it
to the log. For a newly started Follower this can be problematic
because this will be the first entry in that Followers log and so
applying this entry to the Follower's state can end up corrupting the
state or cause failures in committing transactions.

To fix this we now verify if the replicatedToAllIndex is present in the
Followers log. If it is present then the log is considered in sync else
not.

Change-Id: I09bead430f1a4556182263de54846792668cd27c
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 0efdc8e538815024cc54955be0b55ebec0321be6)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index ede0f6af2c9cb11ce005aa76a9a047b97ef5d29c..c1d261c56192cb0b1bbe6905b77faa7d1d8fad0c 100644 (file)
@@ -116,55 +116,16 @@ public class Follower extends AbstractRaftActorBehavior {
             return this;
         }
 
-        // 1. Reply false if term < currentTerm (§5.1)
-        // This is handled in the appendEntries method of the base class
-
         // If we got here then we do appear to be talking to the leader
         leaderId = appendEntries.getLeaderId();
 
         setLeaderPayloadVersion(appendEntries.getPayloadVersion());
 
-        // 2. Reply false if log doesn’t contain an entry at prevLogIndex
-        // whose term matches prevLogTerm (§5.3)
-
-        long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
-        boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
-
         updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
-
-        boolean outOfSync = true;
-
         // First check if the logs are in sync or not
         long lastIndex = lastIndex();
-        if (lastIndex == -1 && appendEntries.getPrevLogIndex() != -1) {
 
-            // The follower's log is out of sync because the leader does have
-            // an entry at prevLogIndex and this follower has no entries in
-            // it's log.
-
-            LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
-                        logName(), appendEntries.getPrevLogIndex());
-        } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) {
-
-            // The follower's log is out of sync because the Leader's
-            // prevLogIndex entry was not found in it's log
-
-            LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
-                        logName(), appendEntries.getPrevLogIndex());
-        } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
-
-            // The follower's log is out of sync because the Leader's
-            // prevLogIndex entry does exist in the follower's log but it has
-            // a different term in it
-
-            LOG.debug(
-                "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}",
-                 logName(), prevLogTerm, appendEntries.getPrevLogTerm());
-        } else {
-            outOfSync = false;
-        }
-
-        if (outOfSync) {
+        if (isOutOfSync(appendEntries)) {
             // We found that the log was out of sync so just send a negative
             // reply and return
 
@@ -271,6 +232,59 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    private boolean isOutOfSync(AppendEntries appendEntries) {
+
+        long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
+        boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
+        long lastIndex = lastIndex();
+        int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
+        boolean outOfSync = true;
+
+        if (lastIndex == -1 && appendEntries.getPrevLogIndex() != -1) {
+
+            // The follower's log is out of sync because the leader does have
+            // an entry at prevLogIndex and this follower has no entries in
+            // it's log.
+
+            LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+                        logName(), appendEntries.getPrevLogIndex());
+        } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) {
+
+            // The follower's log is out of sync because the Leader's
+            // prevLogIndex entry was not found in it's log
+
+            LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+                        logName(), appendEntries.getPrevLogIndex());
+        } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
+
+            // The follower's log is out of sync because the Leader's
+            // prevLogIndex entry does exist in the follower's log but it has
+            // a different term in it
+
+            LOG.debug(
+                    "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}",
+                    logName(), prevLogTerm, appendEntries.getPrevLogTerm());
+        } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
+                && appendEntries.getReplicatedToAllIndex() != -1
+                && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
+            // This append entry comes from a leader who has it's log aggressively trimmed and so does not have
+            // the previous entry in it's in-memory journal
+
+            LOG.debug(
+                    "{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
+                    logName(), appendEntries.getReplicatedToAllIndex());
+        } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
+                && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 &&
+                !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)){
+            LOG.debug(
+                    "{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
+                    logName(), appendEntries.getEntries().get(0).getIndex() - 1);
+        } else {
+            outOfSync = false;
+        }
+        return outOfSync;
+    }
+
     @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
         return this;
index 1e931109c3e9234e73cf3c94f31b1a3e7ecf3978..2cb0d7ce6546ad93d6cb71196e0bf020a4450fb3 100644 (file)
@@ -140,10 +140,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         logStart("testHandleFirstAppendEntries");
 
         MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
+        context.getReplicatedLog().setSnapshotIndex(99);
 
         List<ReplicatedLogEntry> entries = Arrays.asList(
                 newReplicatedLogEntry(2, 101, "foo"));
 
+        Assert.assertEquals(1, context.getReplicatedLog().size());
+
         // The new commitIndex is 101
         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
@@ -151,8 +156,105 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         follower.handleMessage(leaderActor, appendEntries);
 
         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertFalse("append entries reply should be false", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
+        context.getReplicatedLog().setSnapshotIndex(99);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().setSnapshotIndex(100);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().setSnapshotIndex(100);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 105, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
+        assertFalse("append entries reply should be false", reply.isSuccess());
     }
 
     @Test