long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
- if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
+ if(appendEntriesReply.isForceInstallSnapshot()) {
+ // Reset the followers match and next index. This is to signal that this follower has nothing
+ // in common with this Leader and so would require a snapshot to be installed
+ followerLogInformation.setMatchIndex(-1);
+ followerLogInformation.setNextIndex(-1);
+
+ // Force initiate a snapshot capture
+ initiateCaptureSnapshot(followerId);
+ } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
// The follower's log is empty or the last entry is present in the leader's journal
// and the terms match so the follower is just behind the leader's journal from
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return switchBehavior(new Follower(context));
+ return internalSwitchBehavior(RaftState.Follower);
}
}
logIndex)
);
- if (followerToLog.isEmpty()) {
+ boolean applyModificationToState = followerToLog.isEmpty()
+ || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
+
+ if(applyModificationToState){
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
- } else {
+ }
+
+ if (!followerToLog.isEmpty()) {
sendAppendEntries(0, false);
}
}
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
- initiateCaptureSnapshot(followerId, followerNextIndex);
+ if (canInstallSnapshot(followerNextIndex)) {
+ initiateCaptureSnapshot(followerId);
+ }
} else if(sendHeartbeat) {
// we send an AppendEntries, even if the follower is inactive
* 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
* then send the existing snapshot in chunks to the follower.
* @param followerId
- * @param followerNextIndex
*/
- private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
- if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
- context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
-
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
- final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+ private void initiateCaptureSnapshot(String followerId) {
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot.
+ // This could happen if another follower needs an install when one is going on.
+ final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ sendSnapshotChunk(followerActor, followerId);
- } else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
- }
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
+ private boolean canInstallSnapshot(long nextIndex){
+ // If the follower's nextIndex is -1 then we might as well send it a snapshot
+ // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
+ // in the snapshot
+ return (nextIndex == -1 ||
+ (!context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex)));
+
+ }
+
private void sendInstallSnapshot() {
LOG.debug("{}: sendInstallSnapshot", logName());
if (followerActor != null) {
long nextIndex = e.getValue().getNextIndex();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ if (canInstallSnapshot(nextIndex)) {
sendSnapshotChunk(followerActor, e.getKey());
}
}