import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
// snapshot. It's also possible that the follower's last log index is behind the leader's.
// However in this case the log terms won't match and the logs will conflict - this is handled
// elsewhere.
- log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
- + "forcing install snaphot", logName(), followerLogInformation.getId(),
- appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
+ log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+ + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
+ appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().getSnapshotIndex());
followerLogInformation.setMatchIndex(-1);
followerLogInformation.setNextIndex(-1);
followerLogInformation.setNextIndex(followerLastLogIndex - 1);
updated = true;
- log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ "leader's {} - set the follower's next index to {}", logName(),
followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
}
} else {
- log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+ log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
+ logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
if (appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// follower's last log index.
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+ log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+ + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+ appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
} else {
// The follower's log conflicts with leader's log so decrement follower's next index by 1
// in an attempt to find where the logs match.
- followerLogInformation.decrNextIndex();
- updated = true;
+ if (followerLogInformation.decrNextIndex()) {
+ updated = true;
- log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
- logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
- followerLogInformation.getNextIndex());
+ log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+ logName(), followerId, appendEntriesReply.getLogLastTerm(),
+ followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+ }
}
}
// set currentTerm = T, convert to follower (ยง5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ // This is a special case. Normally when stepping down as leader we don't process and reply to the
+ // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+ // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+ // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+ // state and starting a new election and grabbing leadership back before the other candidate node can
+ // start a new election due to lack of responses. This case would only occur if there isn't a majority
+ // of other nodes available that can elect the requesting candidate. Since we're transferring
+ // leadership, we should make every effort to get the requesting node elected.
+ if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+ log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+ super.handleMessage(sender, message);
+ }
+
return internalSwitchBehavior(RaftState.Follower);
}
}
if (reply.isSuccess()) {
if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
- + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1);
long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
- log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
- logName(), followerId, followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+ + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+ followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
if (!anyFollowersInstallingSnapshot()) {
// once there are no pending followers receiving snapshots
installSnapshotState.markSendStatus(true);
}
} else {
- log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
- logName(), reply.getChunkIndex());
+ log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
installSnapshotState.markSendStatus(false);
}
- if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
- // Since the follower is now caught up try to purge the log.
- purgeInMemoryLog();
- } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
+ if (wasLastChunk) {
+ if (!context.getSnapshotManager().isCapturing()) {
+ // Since the follower is now caught up try to purge the log.
+ purgeInMemoryLog();
+ }
+ } else {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
sendSnapshotChunk(followerActor, followerLogInformation);
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
- + "follower-nextIndex: %d, leader-snapshot-index: %d, "
- + "leader-last-index: %d", logName(), followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
- }
-
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
if (canInstallSnapshot(followerNextIndex)) {
+ log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+ + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
+
initiateCaptureSnapshot(followerId);
+ } else {
+ // It doesn't seem like we should ever reach here - most likely indicates sonething is
+ // wrong.
+ log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+ + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+ followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
}
} else if (sendHeartbeat) {
// Ensure the snapshot bytes are set - this is a no-op.
installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
+ if (!installSnapshotState.canSendNextChunk()) {
+ return;
+ }
+
byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),