In some CSIT runs I see messages such as "The prevLogIndex 10 was
found in the log but the term -1 is not equal to the append entries
prevLogTerm 2" which results in the Follower sending an unsuccessful
reply indicating it's out-of-sync. This causes the Leader to try to
catch it up.
However in the test cases, the Follower isn't actually
out-of-sync but rather it gets behind enough (the test issues 1000
txns per second per node) such that the leader re-sends AppendEntries
with log entries it already sent. When the Follower eventually receives
the first AppendEntries, it appends the entries and may do a "fake"
snapshot, ie trim the in-memory journal list and advance the snapshot
index. If so, when the duplicate AppendEntries is received, the leader's
previous log index is no longer in the Follower's journal log.
In Follower.isOutOfSync, isLogEntryPresent returns true b/c the leader's
previous index is included in the last snapshot even though it was trimmed
from the journal log. However, getLogEntryTerm returns -1 b/c it
checks if the index is in the journal log or equal to the snapshot
index, ie it doesn't take into account that the index may be in the
last snapshot. This is inconsistent with the first check and results in
the Follower reporting that it's out-of-sync when it really isnt.
I've also seen a negative result of this on the Leader side when a Follower
reports it's out-of-sync - the Follower's last log index is in the snapshot
but getLogEntryTerm returns -1 causing the Leader to take the last resort,
inefficient path of decrementing the Follower's next index in order to
catch it up.
Therefore I changed these 2 cases to also check if the index is in the snapshot
and, if so, use the snapshot term.
Change-Id: I4331d8fee85789a7004a692abb1b9c629eecd570
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
c77c163d54872612b47f0f2550931fa408ed596b)
@Override
public boolean isInSnapshot(long logEntryIndex) {
- return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
+ return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
}
@Override
followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
- long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
boolean updated = false;
if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
// The follower's log is actually ahead of the leader's log. Normally this doesn't happen
// However in this case the log terms won't match and the logs will conflict - this is handled
// elsewhere.
log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
- + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
- appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
- context.getReplicatedLog().getSnapshotIndex());
+ + "(snapshotIndex {}, snapshotTerm {}) - forcing install snaphot", logName(),
+ followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
+ context.getReplicatedLog().lastIndex(), context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm());
followerLogInformation.setMatchIndex(-1);
followerLogInformation.setNextIndex(-1);
updated = true;
} else if (appendEntriesReply.isSuccess()) {
+ long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
&& followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
// The follower's last entry is present in the leader's journal but the terms don't match so the
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
}
} else {
- log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
- logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
+ log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}, "
+ + "snapshotTerm: {}, replicatedToAllIndex: {}", logName(), appendEntriesReply,
+ context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(),
+ getReplicatedToAllIndex());
+ long followersLastLogTermInLeadersLogOrSnapshot = getLogEntryOrSnapshotTerm(followerLastLogIndex);
if (appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// in common with this Leader and so would require a snapshot to be installed
// Force initiate a snapshot capture
initiateCaptureSnapshot(followerId);
- } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0
- && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
- // The follower's log is empty or the last entry is present in the leader's journal
- // and the terms match so the follower is just behind the leader's journal from
- // the last snapshot, if any. We'll catch up the follower quickly by starting at the
- // follower's last log index.
+ } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLogOrSnapshot >= 0
+ && followersLastLogTermInLeadersLogOrSnapshot == appendEntriesReply.getLogLastTerm()) {
+ // The follower's log is empty or the follower's last entry is present in the leader's journal or
+ // snapshot and the terms match so the follower is just behind the leader's journal from the last
+ // snapshot, if any. We'll catch up the follower quickly by starting at the follower's last log index.
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
logName(), followerId, appendEntriesReply.getLogLastTerm(),
- followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+ followersLastLogTermInLeadersLogOrSnapshot, followerLogInformation.getNextIndex());
}
}
}
}
/**
- * Returns the actual term of the entry in replicated log for the given index or -1 if not found.
+ * Returns the actual term of the entry in the replicated log for the given index or -1 if not found.
*
* @return the log entry term or -1 if not found
*/
return -1;
}
+ /**
+ * Returns the actual term of the entry in the replicated log for the given index or, if not present, returns the
+ * snapshot term if the given index is in the snapshot or -1 otherwise.
+ *
+ * @return the term or -1 otherwise
+ */
+ protected long getLogEntryOrSnapshotTerm(final long index) {
+ if (context.getReplicatedLog().isInSnapshot(index)) {
+ return context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ return getLogEntryTerm(index);
+ }
+
/**
* Applies the log entries up to the specified index that is known to be committed to the state machine.
*
if (lastIndex > -1) {
if (isLogEntryPresent(appendEntries.getPrevLogIndex())) {
- final long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
- if (prevLogTerm != appendEntries.getPrevLogTerm()) {
+ final long leadersPrevLogTermInFollowersLogOrSnapshot =
+ getLogEntryOrSnapshotTerm(appendEntries.getPrevLogIndex());
+ if (leadersPrevLogTermInFollowersLogOrSnapshot != appendEntries.getPrevLogTerm()) {
// The follower's log is out of sync because the Leader's prevLogIndex entry does exist
- // in the follower's log but it has a different term in it
+ // in the follower's log or snapshot but it has a different term.
log.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append "
- + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(),
- appendEntries.getPrevLogIndex(), prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex,
- context.getReplicatedLog().getSnapshotIndex());
+ + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+ appendEntries.getPrevLogIndex(), leadersPrevLogTermInFollowersLogOrSnapshot,
+ appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm());
sendOutOfSyncAppendEntriesReply(sender, false);
return true;
// The follower's log is out of sync because the Leader's prevLogIndex entry was not found in it's log
log.info("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, "
- + "snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
- context.getReplicatedLog().getSnapshotIndex());
+ + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
+ context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
sendOutOfSyncAppendEntriesReply(sender, false);
return true;
// the previous entry in it's in-memory journal
log.info("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the "
- + "in-memory journal", logName(), appendEntries.getReplicatedToAllIndex());
+ + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+ appendEntries.getReplicatedToAllIndex(), lastIndex,
+ context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
sendOutOfSyncAppendEntriesReply(sender, false);
return true;
final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
if (entries.size() > 0 && !isLogEntryPresent(entries.get(0).getIndex() - 1)) {
log.info("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
- + "in-memory journal", logName(), entries.get(0).getIndex() - 1);
+ + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+ entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm());
sendOutOfSyncAppendEntriesReply(sender, false);
return true;
@Test
public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
- logStart("testHandleFirstAppendEntries");
+ logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
MockRaftActorContext context = createActorContext();
context.getReplicatedLog().clear(0,2);
@Test
public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
- logStart("testHandleFirstAppendEntries");
+ logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
MockRaftActorContext context = createActorContext();
context.getReplicatedLog().clear(0,2);
}
/**
- * This test verifies that when an AppendEntries is received a specific prevLogTerm
+ * This test verifies that when an AppendEntries is received with a prevLogTerm
* which does not match the term that is in RaftActors log entry at prevLogIndex
* then the RaftActor does not change it's state and it returns a failure.
*/
MockRaftActorContext context = createActorContext();
- // First set the receivers term to lower number
- context.getTermInformation().update(95, "test");
-
- // AppendEntries is now sent with a bigger term
- // this will set the receivers term to be the same as the sender's term
- AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
- (short)0);
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
follower = createBehavior(context);
assertEquals("isSuccess", false, reply.isSuccess());
}
+ @Test
+ public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
+ logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
+ context.getReplicatedLog().setSnapshotIndex(4);
+ context.getReplicatedLog().setSnapshotTerm(3);
+
+ AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
+
+ follower = createBehavior(context);
+
+ RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+ Assert.assertSame(follower, newBehavior);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals("isSuccess", true, reply.isSuccess());
+ }
+
/**
* This test verifies that when a new AppendEntries message is received with
* new entries and the logs of the sender and receiver match that the new