Bug in AbstractLeader replication consensus 22/32722/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 13 Jan 2016 21:14:27 +0000 (16:14 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 18 Jan 2016 18:16:43 +0000 (18:16 +0000)
I ran into an issue where the leader's commit index wasn't advancing
for new log entries even though consensus was reached. This scenario can
occur if the leader previously didn't get consensus and thus didn't commit
and apply a log entry and later regains leadership with a higher term.

The code in handleAppendEntriesReply doesn't update the commit index
if an entry's term doesn't match the current term. This behavior is correct
as per the raft paper - §5.4.1: "Raft never commits log entries from
previous terms by counting replicas". However the code also breaks out
of the loop and thus can never make progress on new entries in the current
term that reach consensus. This part is incorrect - as per raft "once an
entry from the current term is committed by counting replicas, then all
prior entries are committed indirectly". Therefore we need to continue
processing subsequent log entries in order to eventually make progress.

Change-Id: I2d093848c3a846e1f6420ac695b4ff652a65bf6b
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index f1a178d0cac9fb51737b449edf7cbc3593cd745f..31ee9d2a7a61d3e2fa10fc6eb65544dbf7907b86 100644 (file)
@@ -261,11 +261,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             if (replicatedCount >= minReplicationCount) {
                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
-                if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
-                    context.setCommitIndex(N);
-                } else {
+                if (replicatedLogEntry == null) {
                     break;
                 }
+
+                // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+                // "Raft never commits log entries from previous terms by counting replicas".
+                // However we keep looping so we can make progress when new entries in the current term
+                // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+                // counting replicas, then all prior entries are committed indirectly".
+                if (replicatedLogEntry.getTerm() == currentTerm()) {
+                    context.setCommitIndex(N);
+                }
             } else {
                 break;
             }
index b51f0a70b1dc7ef54da2f837b55354fdffaf02af..0733ad5479063b64ba4d23e5687f3df8fc0d34dc 100644 (file)
@@ -145,9 +145,13 @@ public class LeaderTest extends AbstractLeaderTest {
 
 
     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+        return sendReplicate(actorContext, 1, index);
+    }
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                1, index, payload);
+                term, index, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
     }
@@ -189,6 +193,58 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
     }
 
+    @Test
+    public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+        logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        // The raft context is initialized with a couple log entries. However the commitIndex
+        // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
+        // committed and applied. Now it regains leadership with a higher term (2).
+        long prevTerm = actorContext.getTermInformation().getCurrentTerm();
+        long newTerm = prevTerm + 1;
+        actorContext.getTermInformation().update(newTerm, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower replies with the leader's current last index and term, simulating that it is
+        // up to date with the leader.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
+
+        // The commit index should not get updated even though consensus was reached. This is b/c the
+        // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
+        // from previous terms by counting replicas".
+        assertEquals("Commit Index", -1, actorContext.getCommitIndex());
+
+        followerActor.underlyingActor().clear();
+
+        // Now replicate a new entry with the new term 2.
+        long newIndex = lastIndex + 1;
+        sendReplicate(actorContext, newTerm, newIndex);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+
+        // The follower replies with success. The leader should now update the commit index to the new index
+        // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
+        // prior entries are committed indirectly".
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
+
+        assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
+    }
+
     @Test
     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");