summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
b25ae93)
Since the commitIndex may move in chunks we really want to update
our sync status after we have gone through the AppendEntries message
so our commitIndex reflects the state after processing.
Change-Id: I49c72a21f8d9c3efb7ae9cc1b64276220057f2e2
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
bdb818fbfc5f015ab14883348f170cca8ce79128)
leaderId = appendEntries.getLeaderId();
leaderPayloadVersion = appendEntries.getPayloadVersion();
leaderId = appendEntries.getLeaderId();
leaderPayloadVersion = appendEntries.getPayloadVersion();
- updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
// First check if the logs are in sync or not
long lastIndex = lastIndex();
// First check if the logs are in sync or not
long lastIndex = lastIndex();
lastTerm(), context.getPayloadVersion());
log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
lastTerm(), context.getPayloadVersion());
log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
+ updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
sender.tell(reply, actor());
return this;
}
sender.tell(reply, actor());
return this;
}
// follower's log and state.
log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
// follower's log and state.
log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
+ updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
lastTerm(), context.getPayloadVersion(), true), actor());
return this;
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
lastTerm(), context.getPayloadVersion(), true), actor());
return this;
+ updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
lastTerm(), context.getPayloadVersion(), true), actor());
return this;
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
lastTerm(), context.getPayloadVersion(), true), actor());
return this;
}
// Reply to the leader before applying any previous state so as not to hold up leader consensus.
}
// Reply to the leader before applying any previous state so as not to hold up leader consensus.
+ updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
sender.tell(reply, actor());
// If commitIndex > lastApplied: increment lastApplied, apply
sender.tell(reply, actor());
// If commitIndex > lastApplied: increment lastApplied, apply