//2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
//4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
- return minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
+ return minReplicationCount > 0 ? minReplicationCount - 1 : 0;
}
@VisibleForTesting
// snapshot. It's also possible that the follower's last log index is behind the leader's.
// However in this case the log terms won't match and the logs will conflict - this is handled
// elsewhere.
- LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot",
- logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
- context.getReplicatedLog().lastIndex());
+ LOG.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+ + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
+ appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().getSnapshotIndex());
followerLogInformation.setMatchIndex(-1);
followerLogInformation.setNextIndex(-1);
followerLogInformation.setNextIndex(followerLastLogIndex - 1);
updated = true;
- LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " +
+ LOG.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " +
"leader's {} - set the follower's next index to {}",
logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
}
} else {
- LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+ LOG.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
+ logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
if(appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// follower's last log index.
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+ LOG.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+ + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+ appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
} else {
// The follower's log conflicts with leader's log so decrement follower's next index by 1
// in an attempt to find where the logs match.
followerLogInformation.decrNextIndex();
updated = true;
- LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
- logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
+ LOG.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+ logName(), followerId, appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
followerLogInformation.getNextIndex());
}
}
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ LOG.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
if (reply.isSuccess()) {
if(installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: InstallSnapshotReply received, " +
- "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}",
- logName(), reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1
- );
- }
long followerMatchIndex = snapshot.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
- LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
- logName(), followerId, followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ LOG.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+ + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+ followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
if (!anyFollowersInstallingSnapshot()) {
// once there are no pending followers receiving snapshots
installSnapshotState.markSendStatus(true);
}
} else {
- LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
- logName(), reply.getChunkIndex());
+ LOG.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
installSnapshotState.markSendStatus(false);
}
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
+ if(!isHeartbeat && LOG.isDebugEnabled() || LOG.isTraceEnabled()) {
LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
}
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " +
- "follower-nextIndex: %d, leader-snapshot-index: %d, " +
- "leader-last-index: %d", logName(), followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
- }
-
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
if (canInstallSnapshot(followerNextIndex)) {
+ LOG.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+ + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
+
initiateCaptureSnapshot(followerId);
+ } else {
+ // It doesn't seem like we should ever reach here - most likely indicates sonething is
+ // wrong.
+ LOG.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+ + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+ followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
}
} else if(sendHeartbeat) {
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
return nextIndex == -1 ||
- (!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex));
+ !context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex);
}
// Some other candidate for the same term became a leader and sent us an append entry
if(currentTerm() == appendEntries.getTerm()){
- LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
- logName(), currentTerm());
+ LOG.info("{}: New Leader {} sent an AppendEntries to Candidate for term {} - will switch to Follower",
+ logName(), appendEntries.getLeaderId(), currentTerm());
return switchBehavior(new Follower(context));
}
if (voteCount >= votesRequired) {
if(context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
- LOG.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(),
- context.getReplicatedLog().lastIndex());
+ LOG.info("{}: LastApplied index {} is behind last index {} - switching to PreLeader",
+ logName(), context.getLastApplied(), context.getReplicatedLog().lastIndex());
return internalSwitchBehavior(RaftState.PreLeader);
} else {
return internalSwitchBehavior(RaftState.Leader);
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ LOG.info("{}: Term {} in \"{}\" message is greater than Candidate's term {} - switching to Follower",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
// The raft paper does not say whether or not a Candidate can/should process a RequestVote in
long newTerm = currentTerm + 1;
context.getTermInformation().updateAndPersist(newTerm, context.getId());
- LOG.debug("{}: Starting new term {}", logName(), newTerm);
+ LOG.info("{}: Starting new election term {}", logName(), newTerm);
// Request for a vote
// TODO: Retry request for vote if replies do not arrive in a reasonable
// We found that the log was out of sync so just send a negative
// reply and return
- LOG.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}",
- logName(), lastIndex, lastTerm());
+ final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+ lastTerm(), context.getPayloadVersion());
- sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
- lastTerm(), context.getPayloadVersion()), actor());
+ LOG.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
+
+ sender.tell(reply, actor());
return this;
}
if(!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
- LOG.debug("{}: Removing entries from log starting at {}", logName(),
- matchEntry.getIndex());
+ LOG.info("{}: Removing entries from log starting at {}", logName(), matchEntry.getIndex());
// Entries do not match so remove all subsequent entries
if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
// so we must send back a reply to force a snapshot to completely re-sync the
// follower's log and state.
- LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName());
+ LOG.info("{}: Could not remove entries - sending reply to force snapshot", logName());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
lastTerm(), context.getPayloadVersion(), true), actor());
return this;
// an entry at prevLogIndex and this follower has no entries in
// it's log.
- LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+ LOG.info("{}: The followers log is empty and the senders prevLogIndex is {}",
logName(), appendEntries.getPrevLogIndex());
} else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, snapshotIndex: {}",
+ LOG.info("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, snapshotIndex: {}",
logName(), appendEntries.getPrevLogIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
} else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- LOG.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries " +
+ LOG.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries " +
"prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(),
prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
- LOG.debug(
+ LOG.info(
"{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
&& !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) {
- LOG.debug(
+ LOG.info(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
} else {
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+ LOG.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
installSnapshot.getLastChunkHashCode())){
+
+ LOG.info("{}: Snapshot installed from leader: {}", logName(), installSnapshot.getLeaderId());
+
Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
new ArrayList<ReplicatedLogEntry>(),
installSnapshot.getLastIncludedIndex(),