From: Tom Pantelis Date: Wed, 13 Jan 2016 21:14:27 +0000 (-0500) Subject: Bug in AbstractLeader replication consensus X-Git-Tag: release/beryllium~12 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=10ef5303f76d27271def9296d511916f26b6649c Bug in AbstractLeader replication consensus I ran into an issue where the leader's commit index wasn't advancing for new log entries even though consensus was reached. This scenario can occur if the leader previously didn't get consensus and thus didn't commit and apply a log entry and later regains leadership with a higher term. The code in handleAppendEntriesReply doesn't update the commit index if an entry's term doesn't match the current term. This behavior is correct as per the raft paper - §5.4.1: "Raft never commits log entries from previous terms by counting replicas". However the code also breaks out of the loop and thus can never make progress on new entries in the current term that reach consensus. This part is incorrect - as per raft "once an entry from the current term is committed by counting replicas, then all prior entries are committed indirectly". Therefore we need to continue processing subsequent log entries in order to eventually make progress. Change-Id: I2d093848c3a846e1f6420ac695b4ff652a65bf6b Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index f1a178d0ca..31ee9d2a7a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -261,11 +261,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); - if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) { - context.setCommitIndex(N); - } else { + if (replicatedLogEntry == null) { break; } + + // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: + // "Raft never commits log entries from previous terms by counting replicas". + // However we keep looping so we can make progress when new entries in the current term + // reach consensus, as per §5.4.1: "once an entry from the current term is committed by + // counting replicas, then all prior entries are committed indirectly". + if (replicatedLogEntry.getTerm() == currentTerm()) { + context.setCommitIndex(N); + } } else { break; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index b51f0a70b1..0733ad5479 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -145,9 +145,13 @@ public class LeaderTest extends AbstractLeaderTest { private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + return sendReplicate(actorContext, 1, index); + } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 1, index, payload); + term, index, payload); actorContext.getReplicatedLog().append(newEntry); return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); } @@ -189,6 +193,58 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); } + @Test + public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception { + logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + // The raft context is initialized with a couple log entries. However the commitIndex + // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't + // committed and applied. Now it regains leadership with a higher term (2). + long prevTerm = actorContext.getTermInformation().getCurrentTerm(); + long newTerm = prevTerm + 1; + actorContext.getTermInformation().update(newTerm, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower replies with the leader's current last index and term, simulating that it is + // up to date with the leader. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0)); + + // The commit index should not get updated even though consensus was reached. This is b/c the + // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries + // from previous terms by counting replicas". + assertEquals("Commit Index", -1, actorContext.getCommitIndex()); + + followerActor.underlyingActor().clear(); + + // Now replicate a new entry with the new term 2. + long newIndex = lastIndex + 1; + sendReplicate(actorContext, newTerm, newIndex); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + + // The follower replies with success. The leader should now update the commit index to the new index + // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all + // prior entries are committed indirectly". + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0)); + + assertEquals("Commit Index", newIndex, actorContext.getCommitIndex()); + } + @Test public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");