// snapshot. It's also possible that the follower's last log index is behind the leader's.
// However in this case the log terms won't match and the logs will conflict - this is handled
// elsewhere.
- log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
- + "forcing install snaphot", logName(), followerLogInformation.getId(),
- appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
+ log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+ + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
+ appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().getSnapshotIndex());
followerLogInformation.setMatchIndex(-1);
followerLogInformation.setNextIndex(-1);
followerLogInformation.setNextIndex(followerLastLogIndex - 1);
updated = true;
- log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ "leader's {} - set the follower's next index to {}", logName(),
followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
}
} else {
- log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+ log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
+ logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
if (appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// follower's last log index.
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+ log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+ + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+ appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
} else {
// The follower's log conflicts with leader's log so decrement follower's next index by 1
// in an attempt to find where the logs match.
followerLogInformation.decrNextIndex();
updated = true;
- log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
- logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
+ log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+ logName(), followerId, appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
followerLogInformation.getNextIndex());
}
}
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
if (reply.isSuccess()) {
if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
- + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1);
long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
- log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
- logName(), followerId, followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+ + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+ followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
if (!anyFollowersInstallingSnapshot()) {
// once there are no pending followers receiving snapshots
installSnapshotState.markSendStatus(true);
}
} else {
- log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
- logName(), reply.getChunkIndex());
+ log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
installSnapshotState.markSendStatus(false);
}
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
- + "follower-nextIndex: %d, leader-snapshot-index: %d, "
- + "leader-last-index: %d", logName(), followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
- }
-
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
if (canInstallSnapshot(followerNextIndex)) {
+ log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+ + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
+
initiateCaptureSnapshot(followerId);
+ } else {
+ // It doesn't seem like we should ever reach here - most likely indicates sonething is
+ // wrong.
+ log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+ + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+ followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
}
} else if (sendHeartbeat) {
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
- log.debug("{}: Cannot append entries because sender term {} is less than {}", logName(),
+ log.info("{}: Cannot append entries because sender's term {} is less than {}", logName(),
appendEntries.getTerm(), currentTerm());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
return this;
}
- log.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
+ log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), this.state(),
+ newBehavior.state(), context.getTermInformation().getCurrentTerm());
try {
close();
} catch (RuntimeException e) {
// Some other candidate for the same term became a leader and sent us an append entry
if (currentTerm() == appendEntries.getTerm()) {
- log.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
- logName(), currentTerm());
+ log.info("{}: New Leader {} sent an AppendEntries to Candidate for term {} - will switch to Follower",
+ logName(), appendEntries.getLeaderId(), currentTerm());
return switchBehavior(new Follower(context));
}
if (voteCount >= votesRequired) {
if (context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
- log.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(),
- context.getReplicatedLog().lastIndex());
+ log.info("{}: LastApplied index {} is behind last index {} - switching to PreLeader",
+ logName(), context.getLastApplied(), context.getReplicatedLog().lastIndex());
return internalSwitchBehavior(RaftState.PreLeader);
} else {
return internalSwitchBehavior(RaftState.Leader);
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ log.info("{}: Term {} in \"{}\" message is greater than Candidate's term {} - switching to Follower",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
// The raft paper does not say whether or not a Candidate can/should process a RequestVote in
long newTerm = currentTerm + 1;
context.getTermInformation().updateAndPersist(newTerm, context.getId());
- log.debug("{}: Starting new term {}", logName(), newTerm);
+ log.info("{}: Starting new election term {}", logName(), newTerm);
// Request for a vote
// TODO: Retry request for vote if replies do not arrive in a reasonable
// We found that the log was out of sync so just send a negative
// reply and return
- log.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}",
- logName(), lastIndex, lastTerm());
+ final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+ lastTerm(), context.getPayloadVersion());
- sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
- lastTerm(), context.getPayloadVersion()), actor());
+ log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
+
+ sender.tell(reply, actor());
return this;
}
if (!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
- log.debug("{}: Removing entries from log starting at {}", logName(),
- matchEntry.getIndex());
+ log.info("{}: Removing entries from log starting at {}", logName(), matchEntry.getIndex());
// Entries do not match so remove all subsequent entries
if (!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
// so we must send back a reply to force a snapshot to completely re-sync the
// follower's log and state.
- log.debug("{}: Could not remove entries - sending reply to force snapshot", logName());
+ log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
lastTerm(), context.getPayloadVersion(), true), actor());
return this;
// an entry at prevLogIndex and this follower has no entries in
// it's log.
- log.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+ log.info("{}: The followers log is empty and the senders prevLogIndex is {}",
logName(), appendEntries.getPrevLogIndex());
} else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- log.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - "
+ log.info("{}: The log is not empty but the prevLogIndex {} was not found in it - "
+ "lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
context.getReplicatedLog().getSnapshotIndex());
} else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- log.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries"
+ log.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries"
+ "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(),
prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex,
context.getReplicatedLog().getSnapshotIndex());
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
- log.debug("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the"
+ log.info("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the"
+ " in-memory journal", logName(), appendEntries.getReplicatedToAllIndex());
} else if (appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
&& !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) {
- log.debug("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
+ log.info("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
+ " in-memory journal", logName(), appendEntries.getEntries().get(0).getIndex() - 1);
} else {
outOfSync = false;
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- log.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+ log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
installSnapshot.getLastChunkHashCode())) {
+
+ log.info("{}: Snapshot installed from leader: {}", logName(), installSnapshot.getLeaderId());
+
Snapshot snapshot = Snapshot.create(
context.getSnapshotManager().convertSnapshot(snapshotTracker.getSnapshotBytes()),
new ArrayList<>(),