import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import scala.concurrent.duration.FiniteDuration;
/**
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
private Cancellable heartbeatSchedule = null;
- private Optional<SnapshotHolder> snapshot = Optional.absent();
+ private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
private int minReplicationCount;
protected AbstractLeader(RaftActorContext context, RaftState state,
if (initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
- snapshot = initializeFromLeader.snapshot;
+ snapshotHolder = initializeFromLeader.snapshotHolder;
trackers.addAll(initializeFromLeader.trackers);
} else {
for (PeerInfo peerInfo: context.getPeers()) {
}
@VisibleForTesting
- void setSnapshot(@Nullable Snapshot snapshot) {
- if (snapshot != null) {
- this.snapshot = Optional.of(new SnapshotHolder(snapshot));
- } else {
- this.snapshot = Optional.absent();
- }
+ void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+ this.snapshotHolder = Optional.fromNullable(snapshotHolder);
}
@VisibleForTesting
boolean hasSnapshot() {
- return snapshot.isPresent();
+ return snapshotHolder.isPresent();
}
@Override
// snapshot. It's also possible that the follower's last log index is behind the leader's.
// However in this case the log terms won't match and the logs will conflict - this is handled
// elsewhere.
- log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
- + "forcing install snaphot", logName(), followerLogInformation.getId(),
- appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
+ log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+ + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
+ appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().getSnapshotIndex());
followerLogInformation.setMatchIndex(-1);
followerLogInformation.setNextIndex(-1);
followerLogInformation.setNextIndex(followerLastLogIndex - 1);
updated = true;
- log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ "leader's {} - set the follower's next index to {}", logName(),
followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
}
} else {
- log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+ log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
+ logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
if (appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// follower's last log index.
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+ log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+ + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+ appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
} else {
// The follower's log conflicts with leader's log so decrement follower's next index by 1
// in an attempt to find where the logs match.
- followerLogInformation.decrNextIndex();
- updated = true;
+ if (followerLogInformation.decrNextIndex()) {
+ updated = true;
- log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
- logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
- followerLogInformation.getNextIndex());
+ log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+ logName(), followerId, appendEntriesReply.getLogLastTerm(),
+ followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+ }
}
}
// set currentTerm = T, convert to follower (ยง5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ // This is a special case. Normally when stepping down as leader we don't process and reply to the
+ // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+ // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+ // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+ // state and starting a new election and grabbing leadership back before the other candidate node can
+ // start a new election due to lack of responses. This case would only occur if there isn't a majority
+ // of other nodes available that can elect the requesting candidate. Since we're transferring
+ // leadership, we should make every effort to get the requesting node elected.
+ if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+ log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+ super.handleMessage(sender, message);
+ }
+
return internalSwitchBehavior(RaftState.Follower);
}
}
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
} else if (message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(((SendInstallSnapshot) message).getSnapshot());
+ SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+ setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
sendInstallSnapshot();
} else if (message instanceof Replicate) {
replicate((Replicate) message);
if (reply.isSuccess()) {
if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
- + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1);
- long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+ long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
- log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
- logName(), followerId, followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+ + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+ followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
if (!anyFollowersInstallingSnapshot()) {
// once there are no pending followers receiving snapshots
installSnapshotState.markSendStatus(true);
}
} else {
- log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
- logName(), reply.getChunkIndex());
+ log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
installSnapshotState.markSendStatus(false);
}
- if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
- // Since the follower is now caught up try to purge the log.
- purgeInMemoryLog();
- } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
+ if (wasLastChunk) {
+ if (!context.getSnapshotManager().isCapturing()) {
+ // Since the follower is now caught up try to purge the log.
+ purgeInMemoryLog();
+ }
+ } else {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
sendSnapshotChunk(followerActor, followerLogInformation);
/**
* This method checks if any update needs to be sent to the given follower. This includes append log entries,
* sending next snapshot chunk, and initiating a snapshot.
- *
- * @return true if any update is sent, false otherwise
*/
private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
boolean sendHeartbeat, boolean isHeartbeat) {
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
- + "follower-nextIndex: %d, leader-snapshot-index: %d, "
- + "leader-last-index: %d", logName(), followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
- }
-
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
if (canInstallSnapshot(followerNextIndex)) {
+ log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+ + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
+
initiateCaptureSnapshot(followerId);
+ } else {
+ // It doesn't seem like we should ever reach here - most likely indicates sonething is
+ // wrong.
+ log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+ + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+ followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
}
} else if (sendHeartbeat) {
*/
public boolean initiateCaptureSnapshot(String followerId) {
FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
- if (snapshot.isPresent()) {
+ if (snapshotHolder.isPresent()) {
// If a snapshot is present in the memory, most likely another install is in progress no need to capture
// snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
// Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
sendSnapshotChunk(followerActor, followerLogInfo);
return true;
- } else {
- boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
- if (captureInitiated) {
- followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
- context.getConfigParams().getSnapshotChunkSize(), logName()));
- }
+ }
- return captureInitiated;
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
+ if (captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
}
+
+ return captureInitiated;
}
private boolean canInstallSnapshot(long nextIndex) {
* InstallSnapshot should qualify as a heartbeat too.
*/
private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
- if (snapshot.isPresent()) {
+ if (snapshotHolder.isPresent()) {
LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
if (installSnapshotState == null) {
installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
}
- // Ensure the snapshot bytes are set - this is a no-op.
- installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+ try {
+ // Ensure the snapshot bytes are set - this is a no-op.
+ installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
- byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+ if (!installSnapshotState.canSendNextChunk()) {
+ return;
+ }
- log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
- nextSnapshotChunk.length);
+ byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
- int nextChunkIndex = installSnapshotState.incrementChunkIndex();
- Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
- if (installSnapshotState.isLastChunk(nextChunkIndex)) {
- serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
- }
+ log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+ nextSnapshotChunk.length);
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- snapshot.get().getLastIncludedIndex(),
- snapshot.get().getLastIncludedTerm(),
- nextSnapshotChunk,
- nextChunkIndex,
- installSnapshotState.getTotalChunks(),
- Optional.of(installSnapshotState.getLastChunkHashCode()),
- serverConfig
- ).toSerializable(followerLogInfo.getRaftVersion()),
- actor()
- );
+ int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+ Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+ if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+ serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+ }
- log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
- installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ snapshotHolder.get().getLastIncludedIndex(),
+ snapshotHolder.get().getLastIncludedTerm(),
+ nextSnapshotChunk,
+ nextChunkIndex,
+ installSnapshotState.getTotalChunks(),
+ Optional.of(installSnapshotState.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerLogInfo.getRaftVersion()),
+ actor()
+ );
+
+ log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+ installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
}
}
return followerToLog.size();
}
- private static class SnapshotHolder {
+ static class SnapshotHolder {
private final long lastIncludedTerm;
private final long lastIncludedIndex;
- private final ByteString snapshotBytes;
+ private final ByteSource snapshotBytes;
- SnapshotHolder(Snapshot snapshot) {
+ SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
this.lastIncludedTerm = snapshot.getLastAppliedTerm();
this.lastIncludedIndex = snapshot.getLastAppliedIndex();
- this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+ this.snapshotBytes = snapshotBytes;
}
long getLastIncludedTerm() {
return lastIncludedIndex;
}
- ByteString getSnapshotBytes() {
+ ByteSource getSnapshotBytes() {
return snapshotBytes;
}
}