From: Moiz Raja Date: Mon, 13 Jul 2015 20:31:42 +0000 (-0700) Subject: Fix AppendEntry logic when prevLogIndex and prevLogTerm is -1 X-Git-Tag: release/beryllium~397 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e9bf41afa2413cc4574a7ac97a55f2cb6c2de6b2 Fix AppendEntry logic when prevLogIndex and prevLogTerm is -1 When an AppendEntry arrives at a Follower with the prevLogIndex and prevLogTerm = -1 the Follower will accept that append entry and add it to the log. For a newly started Follower this can be problematic because this will be the first entry in that Followers log and so applying this entry to the Follower's state can end up corrupting the state or cause failures in committing transactions. To fix this we now verify if the replicatedToAllIndex is present in the Followers log. If it is present then the log is considered in sync else not. Change-Id: I09bead430f1a4556182263de54846792668cd27c Signed-off-by: Moiz Raja (cherry picked from commit 0efdc8e538815024cc54955be0b55ebec0321be6) --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index ede0f6af2c..c1d261c561 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -116,55 +116,16 @@ public class Follower extends AbstractRaftActorBehavior { return this; } - // 1. Reply false if term < currentTerm (§5.1) - // This is handled in the appendEntries method of the base class - // If we got here then we do appear to be talking to the leader leaderId = appendEntries.getLeaderId(); setLeaderPayloadVersion(appendEntries.getPayloadVersion()); - // 2. Reply false if log doesn’t contain an entry at prevLogIndex - // whose term matches prevLogTerm (§5.3) - - long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex()); - boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex()); - updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId()); - - boolean outOfSync = true; - // First check if the logs are in sync or not long lastIndex = lastIndex(); - if (lastIndex == -1 && appendEntries.getPrevLogIndex() != -1) { - // The follower's log is out of sync because the leader does have - // an entry at prevLogIndex and this follower has no entries in - // it's log. - - LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}", - logName(), appendEntries.getPrevLogIndex()); - } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) { - - // The follower's log is out of sync because the Leader's - // prevLogIndex entry was not found in it's log - - LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it", - logName(), appendEntries.getPrevLogIndex()); - } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) { - - // The follower's log is out of sync because the Leader's - // prevLogIndex entry does exist in the follower's log but it has - // a different term in it - - LOG.debug( - "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}", - logName(), prevLogTerm, appendEntries.getPrevLogTerm()); - } else { - outOfSync = false; - } - - if (outOfSync) { + if (isOutOfSync(appendEntries)) { // We found that the log was out of sync so just send a negative // reply and return @@ -271,6 +232,59 @@ public class Follower extends AbstractRaftActorBehavior { return this; } + private boolean isOutOfSync(AppendEntries appendEntries) { + + long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex()); + boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex()); + long lastIndex = lastIndex(); + int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0; + boolean outOfSync = true; + + if (lastIndex == -1 && appendEntries.getPrevLogIndex() != -1) { + + // The follower's log is out of sync because the leader does have + // an entry at prevLogIndex and this follower has no entries in + // it's log. + + LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}", + logName(), appendEntries.getPrevLogIndex()); + } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) { + + // The follower's log is out of sync because the Leader's + // prevLogIndex entry was not found in it's log + + LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it", + logName(), appendEntries.getPrevLogIndex()); + } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) { + + // The follower's log is out of sync because the Leader's + // prevLogIndex entry does exist in the follower's log but it has + // a different term in it + + LOG.debug( + "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}", + logName(), prevLogTerm, appendEntries.getPrevLogTerm()); + } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 + && appendEntries.getReplicatedToAllIndex() != -1 + && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) { + // This append entry comes from a leader who has it's log aggressively trimmed and so does not have + // the previous entry in it's in-memory journal + + LOG.debug( + "{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal", + logName(), appendEntries.getReplicatedToAllIndex()); + } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 + && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 && + !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)){ + LOG.debug( + "{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal", + logName(), appendEntries.getEntries().get(0).getIndex() - 1); + } else { + outOfSync = false; + } + return outOfSync; + } + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { return this; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 1e931109c3..2cb0d7ce65 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -140,10 +140,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); List entries = Arrays.asList( newReplicatedLogEntry(2, 101, "foo")); + Assert.assertEquals(1, context.getReplicatedLog().size()); + // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); @@ -151,8 +156,105 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower.handleMessage(leaderActor, appendEntries); FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 105, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); } @Test