import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
// snapshot. It's also possible that the follower's last log index is behind the leader's.
// However in this case the log terms won't match and the logs will conflict - this is handled
// elsewhere.
- log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
- + "forcing install snaphot", logName(), followerLogInformation.getId(),
- appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
+ log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+ + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
+ appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().getSnapshotIndex());
followerLogInformation.setMatchIndex(-1);
followerLogInformation.setNextIndex(-1);
followerLogInformation.setNextIndex(followerLastLogIndex - 1);
updated = true;
- log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ "leader's {} - set the follower's next index to {}", logName(),
followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
}
} else {
- log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+ log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
+ logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
if (appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// follower's last log index.
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+ log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+ + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+ appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
} else {
// The follower's log conflicts with leader's log so decrement follower's next index by 1
// in an attempt to find where the logs match.
- followerLogInformation.decrNextIndex();
- updated = true;
+ if (followerLogInformation.decrNextIndex()) {
+ updated = true;
- log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
- logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
- followerLogInformation.getNextIndex());
+ log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+ logName(), followerId, appendEntriesReply.getLogLastTerm(),
+ followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+ }
}
}
// set currentTerm = T, convert to follower (ยง5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ // This is a special case. Normally when stepping down as leader we don't process and reply to the
+ // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+ // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+ // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+ // state and starting a new election and grabbing leadership back before the other candidate node can
+ // start a new election due to lack of responses. This case would only occur if there isn't a majority
+ // of other nodes available that can elect the requesting candidate. Since we're transferring
+ // leadership, we should make every effort to get the requesting node elected.
+ if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+ log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+ super.handleMessage(sender, message);
+ }
+
return internalSwitchBehavior(RaftState.Follower);
}
}
if (reply.isSuccess()) {
if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
- + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1);
long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
- log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
- logName(), followerId, followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+ + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+ followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
if (!anyFollowersInstallingSnapshot()) {
// once there are no pending followers receiving snapshots
installSnapshotState.markSendStatus(true);
}
} else {
- log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
- logName(), reply.getChunkIndex());
+ log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
installSnapshotState.markSendStatus(false);
}
- if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
- // Since the follower is now caught up try to purge the log.
- purgeInMemoryLog();
- } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
+ if (wasLastChunk) {
+ if (!context.getSnapshotManager().isCapturing()) {
+ // Since the follower is now caught up try to purge the log.
+ purgeInMemoryLog();
+ }
+ } else {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
sendSnapshotChunk(followerActor, followerLogInformation);
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
- + "follower-nextIndex: %d, leader-snapshot-index: %d, "
- + "leader-last-index: %d", logName(), followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
- }
-
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
if (canInstallSnapshot(followerNextIndex)) {
+ log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+ + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
+
initiateCaptureSnapshot(followerId);
+ } else {
+ // It doesn't seem like we should ever reach here - most likely indicates sonething is
+ // wrong.
+ log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+ + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+ followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
}
} else if (sendHeartbeat) {
followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
}
- // Ensure the snapshot bytes are set - this is a no-op.
- installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
+ try {
+ // Ensure the snapshot bytes are set - this is a no-op.
+ installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
- byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+ if (!installSnapshotState.canSendNextChunk()) {
+ return;
+ }
- log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
- nextSnapshotChunk.length);
+ byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
- int nextChunkIndex = installSnapshotState.incrementChunkIndex();
- Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
- if (installSnapshotState.isLastChunk(nextChunkIndex)) {
- serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
- }
+ log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+ nextSnapshotChunk.length);
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- snapshotHolder.get().getLastIncludedIndex(),
- snapshotHolder.get().getLastIncludedTerm(),
- nextSnapshotChunk,
- nextChunkIndex,
- installSnapshotState.getTotalChunks(),
- Optional.of(installSnapshotState.getLastChunkHashCode()),
- serverConfig
- ).toSerializable(followerLogInfo.getRaftVersion()),
- actor()
- );
+ int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+ Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+ if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+ serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+ }
- log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
- installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ snapshotHolder.get().getLastIncludedIndex(),
+ snapshotHolder.get().getLastIncludedTerm(),
+ nextSnapshotChunk,
+ nextChunkIndex,
+ installSnapshotState.getTotalChunks(),
+ Optional.of(installSnapshotState.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerLogInfo.getRaftVersion()),
+ actor()
+ );
+
+ log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+ installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
}
}
static class SnapshotHolder {
private final long lastIncludedTerm;
private final long lastIncludedIndex;
- private final ByteString snapshotBytes;
+ private final ByteSource snapshotBytes;
SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
this.lastIncludedTerm = snapshot.getLastAppliedTerm();
this.lastIncludedIndex = snapshot.getLastAppliedIndex();
- try {
- this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
- } catch (IOException e) {
- throw new RuntimeException("Error reading state", e);
- }
+ this.snapshotBytes = snapshotBytes;
}
long getLastIncludedTerm() {
return lastIncludedIndex;
}
- ByteString getSnapshotBytes() {
+ ByteSource getSnapshotBytes() {
return snapshotBytes;
}
}