import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
}
}
- // Now figure out if this reply warrants a change in the commitIndex
- // If there exists an N such that N > commitIndex, a majority
- // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
- // set commitIndex = N (§5.3, §5.4).
if (log.isTraceEnabled()) {
log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
}
+ possiblyUpdateCommitIndex();
+
+ //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+ sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
+ return this;
+ }
+
+ private void possiblyUpdateCommitIndex() {
+ // Figure out if we can update the the commitIndex as follows:
+ // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
+ // and log[N].term == currentTerm:
+ // set commitIndex = N (§5.3, §5.4).
for (long index = context.getCommitIndex() + 1; ; index++) {
- int replicatedCount = 1;
+ ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
+ if (replicatedLogEntry == null) {
+ log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().size());
+ break;
+ }
+
+ // Count our entry if it has been persisted.
+ int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
+
+ if (replicatedCount == 0) {
+ // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally
+ // we should be able to update the commit index if we get a consensus amongst the followers
+ // w/o the local persistence ack however this can cause timing issues with snapshot capture
+ // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's
+ // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding
+ // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't
+ // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState
+ // message in lieu of synchronously updating lastAppliedIndex and applying to state.
+ break;
+ }
log.trace("{}: checking Nth index {}", logName(), index);
for (FollowerLogInformation info : followerToLog.values()) {
}
if (replicatedCount >= minReplicationCount) {
- ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
- if (replicatedLogEntry == null) {
- log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
- logName(), index, context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().size());
- break;
- }
-
// Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
// "Raft never commits log entries from previous terms by counting replicas".
// However we keep looping so we can make progress when new entries in the current term
// Apply the change to the state machine
if (context.getCommitIndex() > context.getLastApplied()) {
- if (log.isDebugEnabled()) {
- log.debug(
- "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
- logName(), followerId, context.getCommitIndex(), context.getLastApplied());
- }
+ log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+ context.getCommitIndex(), context.getLastApplied());
applyLogToStateMachine(context.getCommitIndex());
}
if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
-
- //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
- sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
-
- return this;
}
private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
replicate((Replicate) message);
} else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof CheckConsensusReached) {
+ possiblyUpdateCommitIndex();
} else {
return super.handleMessage(sender, message);
}