if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the "
+ "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
- snapshotTracker = null;
+ closeSnapshotTracker();
}
if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
log.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
}
+ AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex, lastTerm(), context.getPayloadVersion());
+
+ if (log.isTraceEnabled()) {
+ log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
+ } else if (log.isDebugEnabled() && numLogEntries > 0) {
+ log.debug("{}: handleAppendEntries returning : {}", logName(), reply);
+ }
+
+ // Reply to the leader before applying any previous state so as not to hold up leader consensus.
+ sender.tell(reply, actor());
+
// If commitIndex > lastApplied: increment lastApplied, apply
// log[lastApplied] to state machine (ยง5.3)
// check if there are any entries to be applied. last-applied can be equal to last-index
applyLogToStateMachine(appendEntries.getLeaderCommit());
}
- AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
- lastIndex, lastTerm(), context.getPayloadVersion());
-
- if (log.isTraceEnabled()) {
- log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
- } else if (log.isDebugEnabled() && numLogEntries > 0) {
- log.debug("{}: handleAppendEntries returning : {}", logName(), reply);
- }
-
- sender.tell(reply, actor());
-
if (!context.getSnapshotManager().isCapturing()) {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
leaderId = installSnapshot.getLeaderId();
if (snapshotTracker == null) {
- snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
+ snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId(),
+ context);
}
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());