import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
/**
* The behavior of a RaftActor when it is in the Leader state
- * <p/>
+ *
+ * <p>
* Leaders:
* <ul>
* <li> Upon election: send initial empty AppendEntries RPCs
* follower (§5.3)
* <li> If AppendEntries fails because of log inconsistency:
* decrement nextIndex and retry (§5.3)
- * <li> If there exists an N such that N > commitIndex, a majority
+ * <li> If there exists an N such that N > commitIndex, a majority
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
* </ul>
}
}
- // Now figure out if this reply warrants a change in the commitIndex
- // If there exists an N such that N > commitIndex, a majority
- // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
- // set commitIndex = N (§5.3, §5.4).
if (log.isTraceEnabled()) {
log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
}
+ possiblyUpdateCommitIndex();
+
+ //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+ sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
+ return this;
+ }
+
+ private void possiblyUpdateCommitIndex() {
+ // Figure out if we can update the the commitIndex as follows:
+ // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
+ // and log[N].term == currentTerm:
+ // set commitIndex = N (§5.3, §5.4).
for (long index = context.getCommitIndex() + 1; ; index++) {
- int replicatedCount = 1;
+ ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
+ if (replicatedLogEntry == null) {
+ log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().size());
+ break;
+ }
+
+ // Count our entry if it has been persisted.
+ int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
+
+ if (replicatedCount == 0) {
+ // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
+ // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
+ // amongst the followers w/o the local persistence ack.
+ break;
+ }
log.trace("{}: checking Nth index {}", logName(), index);
for (FollowerLogInformation info : followerToLog.values()) {
}
if (replicatedCount >= minReplicationCount) {
- ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
- if (replicatedLogEntry == null) {
- log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
- logName(), index, context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().size());
- break;
- }
-
// Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
// "Raft never commits log entries from previous terms by counting replicas".
// However we keep looping so we can make progress when new entries in the current term
// Apply the change to the state machine
if (context.getCommitIndex() > context.getLastApplied()) {
- if (log.isDebugEnabled()) {
- log.debug(
- "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
- logName(), followerId, context.getCommitIndex(), context.getLastApplied());
- }
+ log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+ context.getCommitIndex(), context.getLastApplied());
applyLogToStateMachine(context.getCommitIndex());
}
if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
-
- //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
- sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
-
- return this;
}
private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
replicate((Replicate) message);
} else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof CheckConsensusReached) {
+ possiblyUpdateCommitIndex();
} else {
return super.handleMessage(sender, message);
}
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
- replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
+ log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
+ replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(),
+ replicate.isSendImmediate());
// Create a tracker entry we will use this later to notify the
// client actor
applyLogToStateMachine(logIndex);
}
- if (!followerToLog.isEmpty()) {
+ if (replicate.isSendImmediate() && !followerToLog.isEmpty()) {
sendAppendEntries(0, false);
}
}
/**
* This method checks if any update needs to be sent to the given follower. This includes append log entries,
* sending next snapshot chunk, and initiating a snapshot.
- *
- * @return true if any update is sent, false otherwise
*/
private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
boolean sendHeartbeat, boolean isHeartbeat) {
/**
* Initiates a snapshot capture to install on a follower.
- * <p/>
+ *
+ * <p>
* Install Snapshot works as follows
* 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
* 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
// Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
sendSnapshotChunk(followerActor, followerLogInfo);
return true;
- } else {
- boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
- if (captureInitiated) {
- followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
- context.getConfigParams().getSnapshotChunkSize(), logName()));
- }
+ }
- return captureInitiated;
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
+ if (captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
}
+
+ return captureInitiated;
}
private boolean canInstallSnapshot(long nextIndex) {