import org.opendaylight.controller.cluster.raft.Snapshot;
/**
- * Internal message, issued by follower to its actor
+ * Internal message, issued by follower to its actor.
*/
public class ApplySnapshot {
private static final Callback NOOP_CALLBACK = new Callback() {
this.installSnapshotInitiated = installSnapshotInitiated;
this.replicatedToAllIndex = replicatedToAllIndex;
this.replicatedToAllTerm = replicatedToAllTerm;
- this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.<ReplicatedLogEntry>emptyList();
+ this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries :
+ Collections.<ReplicatedLogEntry>emptyList();
}
public long getLastAppliedIndex() {
.append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
.append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
.append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
- .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]");
+ .append(replicatedToAllTerm).append(", unAppliedEntries size=")
+ .append(unAppliedEntries.size()).append("]");
return builder.toString();
}
/**
* The FollowerInitialSyncUpStatus is sent by a Follower to inform any RaftActor subclass whether the Follower
- * is at least at the same commitIndex as the Leader was when it sent the follower the very first heartbeat.
- *
+ * is at least at the same commitIndex as the Leader was when it sent the follower the very first heart beat.
* This status can be used to determine if a Follower has caught up with the current Leader in an upgrade scenario
* for example.
*/
import java.io.Serializable;
/**
- * This messages is sent to the Leader to prompt it to send a heartbeat
- * to it's followers.
- *
- * Typically the Leader to itself on a schedule
+ * This messages is sent via a schedule to the Leader to prompt it to send a heart beat to its followers.
*/
public final class SendHeartBeat implements Serializable {
private static final long serialVersionUID = 1L;
import org.opendaylight.controller.cluster.raft.Snapshot;
/**
- * Internal message sent from the SnapshotManager to its associated leader. The leader is expected to apply the
- * {@link Snapshot} to its state.
+ * Internal message sent from the SnapshotManager to its associated leader when a snapshot capture is complete to
+ * prompt the leader to install the snapshot on its followers as needed.
*/
public final class SendInstallSnapshot {
private final Snapshot snapshot;
this.snapshot = Preconditions.checkNotNull(snapshot);
}
- @Nonnull public Snapshot getSnapshot() {
+ @Nonnull
+ public Snapshot getSnapshot() {
return snapshot;
}
}
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder("SwitchBehavior{");
- sb.append("newState=").append(newState);
- sb.append(", newTerm=").append(newTerm);
- sb.append('}');
- return sb.toString();
+ return "SwitchBehavior [newState=" + newState + ", newTerm=" + newTerm + "]";
}
}
* respond after entry applied to state machine (§5.3)
* <li> If last log index ≥ nextIndex for a follower: send
* AppendEntries RPC with log entries starting at nextIndex
- * <ul>
* <li> If successful: update nextIndex and matchIndex for
* follower (§5.3)
* <li> If AppendEntries fails because of log inconsistency:
* decrement nextIndex and retry (§5.3)
- * </ul>
* <li> If there exists an N such that N > commitIndex, a majority
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
+ * </ul>
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
/**
* Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
* expect the entries to be modified in sequence, hence we open-code the lookup.
- *
* TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
* but we already expect those to be far from frequent.
*/
@Nullable AbstractLeader initializeFromLeader) {
super(context, state);
- if(initializeFromLeader != null) {
+ if (initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
snapshot = initializeFromLeader.snapshot;
trackers.addAll(initializeFromLeader.trackers);
} else {
- for(PeerInfo peerInfo: context.getPeers()) {
+ for (PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
followerToLog.put(peerInfo.getId(), followerLogInformation);
}
}
- LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
+ log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
context.getPeerInfo(followerId), -1, context);
followerToLog.put(followerId, followerLogInformation);
- if(heartbeatSchedule == null) {
+ if (heartbeatSchedule == null) {
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
}
public void updateMinReplicaCount() {
int numVoting = 0;
- for(PeerInfo peer: context.getPeers()) {
- if(peer.isVoting()) {
+ for (PeerInfo peer: context.getPeers()) {
+ if (peer.isVoting()) {
numVoting++;
}
}
minReplicationCount = getMajorityVoteCount(numVoting);
}
- protected int getMinIsolatedLeaderPeerCount(){
+ protected int getMinIsolatedLeaderPeerCount() {
//the isolated Leader peer count will be 1 less than the majority vote count.
//this is because the vote count has the self vote counted in it
//for e.g
//2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
//4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
- return minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
+ return minReplicationCount > 0 ? minReplicationCount - 1 : 0;
}
@VisibleForTesting
void setSnapshot(@Nullable Snapshot snapshot) {
- if(snapshot != null) {
+ if (snapshot != null) {
this.snapshot = Optional.of(new SnapshotHolder(snapshot));
} else {
this.snapshot = Optional.absent();
protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
- LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+ log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
return this;
}
@Override
protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
- }
+ log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- if(followerLogInformation == null){
- LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
+ if (followerLogInformation == null) {
+ log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
return this;
}
- if(followerLogInformation.timeSinceLastActivity() >
- context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
- LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " +
- "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
+ if (followerLogInformation.timeSinceLastActivity()
+ > context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
+ log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
+ + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
context.getLastApplied(), context.getCommitIndex());
}
long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
boolean updated = false;
- if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
+ if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
// The follower's log is actually ahead of the leader's log. Normally this doesn't happen
// in raft as a node cannot become leader if it's log is behind another's. However, the
// non-voting semantics deviate a bit from raft. Only voting members participate in
// snapshot. It's also possible that the follower's last log index is behind the leader's.
// However in this case the log terms won't match and the logs will conflict - this is handled
// elsewhere.
- LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot",
- logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
- context.getReplicatedLog().lastIndex());
+ log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
+ + "forcing install snaphot", logName(), followerLogInformation.getId(),
+ appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
followerLogInformation.setMatchIndex(-1);
followerLogInformation.setNextIndex(-1);
updated = true;
} else if (appendEntriesReply.isSuccess()) {
- if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 &&
- followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
+ if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
+ && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
// The follower's last entry is present in the leader's journal but the terms don't match so the
// follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
// either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
followerLogInformation.setNextIndex(followerLastLogIndex - 1);
updated = true;
- LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " +
- "leader's {} - set the follower's next index to {}",
- logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
+ log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ + "leader's {} - set the follower's next index to {}", logName(),
+ followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
} else {
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
}
} else {
- LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+ log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
- if(appendEntriesReply.isForceInstallSnapshot()) {
+ if (appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// in common with this Leader and so would require a snapshot to be installed
followerLogInformation.setMatchIndex(-1);
// Force initiate a snapshot capture
initiateCaptureSnapshot(followerId);
- } else if(followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 &&
- followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
+ } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0
+ && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
// The follower's log is empty or the last entry is present in the leader's journal
// and the terms match so the follower is just behind the leader's journal from
// the last snapshot, if any. We'll catch up the follower quickly by starting at the
followerLogInformation.decrNextIndex();
updated = true;
- LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
+ log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
followerLogInformation.getNextIndex());
}
// If there exists an N such that N > commitIndex, a majority
// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
// set commitIndex = N (§5.3, §5.4).
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+ if (log.isTraceEnabled()) {
+ log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
}
- for (long N = context.getCommitIndex() + 1; ; N++) {
+ for (long index = context.getCommitIndex() + 1; ; index++) {
int replicatedCount = 1;
- LOG.trace("{}: checking Nth index {}", logName(), N);
+ log.trace("{}: checking Nth index {}", logName(), index);
for (FollowerLogInformation info : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(info.getId());
- if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) {
+ if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) {
replicatedCount++;
- } else if(LOG.isTraceEnabled()) {
- LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+ } else if (log.isTraceEnabled()) {
+ log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
info.getMatchIndex(), peerInfo);
}
}
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount);
+ if (log.isTraceEnabled()) {
+ log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount,
+ minReplicationCount);
}
if (replicatedCount >= minReplicationCount) {
- ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
+ ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
if (replicatedLogEntry == null) {
- LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
- logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size());
+ log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().size());
break;
}
// reach consensus, as per §5.4.1: "once an entry from the current term is committed by
// counting replicas, then all prior entries are committed indirectly".
if (replicatedLogEntry.getTerm() == currentTerm()) {
- LOG.trace("{}: Setting commit index to {}", logName(), N);
- context.setCommitIndex(N);
+ log.trace("{}: Setting commit index to {}", logName(), index);
+ context.setCommitIndex(index);
} else {
- LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}",
- logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
+ log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, "
+ + "term {} does not match the current term {}", logName(), index,
+ replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
}
} else {
- LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
+ log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
break;
}
}
// Apply the change to the state machine
if (context.getCommitIndex() > context.getLastApplied()) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
- logName(), followerId, context.getCommitIndex(), context.getLastApplied());
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
+ logName(), followerId, context.getCommitIndex(), context.getLastApplied());
}
applyLogToStateMachine(context.getCommitIndex());
boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
- if(updated && LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
- logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ if (updated && log.isDebugEnabled()) {
+ log.debug(
+ "{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
+ logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
}
return updated;
}
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
beforeSendHeartbeat();
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- } else if(message instanceof SendInstallSnapshot) {
+ } else if (message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
}
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
- LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
+ log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
String followerId = reply.getFollowerId();
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- if(followerLogInformation == null) {
+ if (followerLogInformation == null) {
// This can happen during AddServer if it times out.
- LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+ log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
logName(), followerId);
return;
}
LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
if (installSnapshotState == null) {
- LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
+ log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
logName(), followerId);
return;
}
if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
boolean wasLastChunk = false;
if (reply.isSuccess()) {
- if(installSnapshotState.isLastChunk(reply.getChunkIndex())) {
+ if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: InstallSnapshotReply received, " +
- "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}",
- logName(), reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1
- );
- }
+ log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
+ + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
+ context.getReplicatedLog().getSnapshotIndex() + 1);
long followerMatchIndex = snapshot.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
- LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
+ log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
logName(), followerId, followerLogInformation.getMatchIndex(),
followerLogInformation.getNextIndex());
}
wasLastChunk = true;
- if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
+ if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
new UnInitializedFollowerSnapshotReply(followerId);
context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
- LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self");
+ log.debug("Sent message UnInitializedFollowerSnapshotReply to self");
}
} else {
installSnapshotState.markSendStatus(true);
}
} else {
- LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+ log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
logName(), reply.getChunkIndex());
installSnapshotState.markSendStatus(false);
purgeInMemoryLog();
} else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
- if(followerActor != null) {
+ if (followerActor != null) {
sendSnapshotChunk(followerActor, followerLogInformation);
}
}
} else {
- LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+ log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
logName(), reply.getChunkIndex(), followerId,
installSnapshotState.getChunkIndex());
- if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
+ if (reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) {
// Since the Follower did not find this index to be valid we should reset the follower snapshot
// so that Installing the snapshot can resume from the beginning
installSnapshotState.reset();
}
private boolean anyFollowersInstallingSnapshot() {
- for(FollowerLogInformation info: followerToLog.values()) {
- if(info.getInstallSnapshotState() != null) {
+ for (FollowerLogInformation info: followerToLog.values()) {
+ if (info.getInstallSnapshotState() != null) {
return true;
}
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
+ log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
// Create a tracker entry we will use this later to notify the
// client actor
- if(replicate.getClientActor() != null) {
+ if (replicate.getClientActor() != null) {
trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
logIndex));
}
boolean applyModificationToState = !context.anyVotingPeers()
|| context.getRaftPolicy().applyModificationToStateBeforeConsensus();
- if(applyModificationToState){
+ if (applyModificationToState) {
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
}
final String followerId = e.getKey();
final FollowerLogInformation followerLogInformation = e.getValue();
// This checks helps not to send a repeat message to the follower
- if(!followerLogInformation.isFollowerActive() ||
- followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
+ if (!followerLogInformation.isFollowerActive()
+ || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
}
}
}
/**
- *
* This method checks if any update needs to be sent to the given follower. This includes append log entries,
* sending next snapshot chunk, and initiating a snapshot.
+ *
* @return true if any update is sent, false otherwise
*/
-
private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
boolean sendHeartbeat, boolean isHeartbeat) {
// if install snapshot is in process , then sent next chunk if possible
if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
sendSnapshotChunk(followerActor, followerLogInformation);
- } else if(sendHeartbeat) {
+ } else if (sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntries = true;
}
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
- LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
- logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+ if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) {
+ log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, "
+ + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive,
+ followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
}
if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
- LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
+ log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
followerNextIndex, followerId);
- if(followerLogInformation.okToReplicate()) {
+ if (followerLogInformation.okToReplicate()) {
// Try to send all the entries in the journal but not exceeding the max data size
// for a single AppendEntries message.
int maxEntries = (int) context.getReplicatedLog().size();
context.getConfigParams().getSnapshotChunkSize());
sendAppendEntries = true;
}
- } else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
+ } else if (isFollowerActive && followerNextIndex >= 0
+ && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " +
- "follower-nextIndex: %d, leader-snapshot-index: %d, " +
- "leader-last-index: %d", logName(), followerId,
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
+ + "follower-nextIndex: %d, leader-snapshot-index: %d, "
+ + "leader-last-index: %d", logName(), followerId,
followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
}
initiateCaptureSnapshot(followerId);
}
- } else if(sendHeartbeat) {
+ } else if (sendHeartbeat) {
// we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
sendAppendEntries = true;
}
- if(sendAppendEntries) {
+ if (sendAppendEntries) {
sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
}
}
getLogEntryTerm(followerNextIndex - 1), entries,
leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
- if(!entries.isEmpty() || LOG.isTraceEnabled()) {
- LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
+ if (!entries.isEmpty() || log.isTraceEnabled()) {
+ log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
appendEntries);
}
/**
* Initiates a snapshot capture to install on a follower.
- *
+ * <p/>
* Install Snapshot works as follows
* 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
* 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
} else {
boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
this.getReplicatedToAllIndex(), followerId);
- if(captureInitiated) {
+ if (captureInitiated) {
followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
context.getConfigParams().getSnapshotChunkSize(), logName()));
}
}
}
- private boolean canInstallSnapshot(long nextIndex){
+ private boolean canInstallSnapshot(long nextIndex) {
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return nextIndex == -1 ||
- (!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex));
+ return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex);
}
private void sendInstallSnapshot() {
- LOG.debug("{}: sendInstallSnapshot", logName());
+ log.debug("{}: sendInstallSnapshot", logName());
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
String followerId = e.getKey();
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
long nextIndex = followerLogInfo.getNextIndex();
- if (followerLogInfo.getInstallSnapshotState() != null ||
- context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
- canInstallSnapshot(nextIndex)) {
+ if (followerLogInfo.getInstallSnapshotState() != null
+ || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED
+ || canInstallSnapshot(nextIndex)) {
sendSnapshotChunk(followerActor, followerLogInfo);
}
}
byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+ log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
nextSnapshotChunk.length);
int nextChunkIndex = installSnapshotState.incrementChunkIndex();
Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
- if(installSnapshotState.isLastChunk(nextChunkIndex)) {
+ if (installSnapshotState.isLastChunk(nextChunkIndex)) {
serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
}
actor()
);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
- installSnapshotState.getTotalChunks());
- }
+ log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+ installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
}
}
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
- LOG.trace("{}: Sending heartbeat", logName());
+ log.trace("{}: Sending heartbeat", logName());
sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
}
}
int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
- if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
+ if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
--minPresent;
if (minPresent == 0) {
return false;
import scala.concurrent.duration.FiniteDuration;
/**
- * Abstract class that represents the behavior of a RaftActor
- * <p/>
- * All Servers:
- * <ul>
- * <li> If commitIndex > lastApplied: increment lastApplied, apply
- * log[lastApplied] to state machine (§5.3)
- * <li> If RPC request or response contains term T > currentTerm:
- * set currentTerm = T, convert to follower (§5.1)
+ * Abstract class that provides common code for a RaftActor behavior.
*/
public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
/**
- * Information about the RaftActor whose behavior this class represents
+ * Information about the RaftActor whose behavior this class represents.
*/
protected final RaftActorContext context;
/**
- *
+ * Used for message logging.
*/
- protected final Logger LOG;
+ protected final Logger log;
/**
- *
+ * Prepended to log messages to provide appropriate context.
*/
- private Cancellable electionCancel = null;
-
- private long replicatedToAllIndex = -1;
-
private final String logName;
+ /**
+ * The RaftState corresponding to his behavior.
+ */
private final RaftState state;
+ /**
+ * Used to cancel a scheduled election.
+ */
+ private Cancellable electionCancel = null;
+
+ /**
+ * The index of the last log entry that has been replicated to all raft peers.
+ */
+ private long replicatedToAllIndex = -1;
+
AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) {
this.context = Preconditions.checkNotNull(context);
this.state = Preconditions.checkNotNull(state);
- this.LOG = context.getLogger();
+ this.log = context.getLogger();
logName = String.format("%s (%s)", context.getId(), state);
}
protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries);
-
/**
- * appendEntries first processes the AppendEntries message and then
- * delegates handling to a specific behavior
+ * Handles the common logic for the AppendEntries message and delegates handling to the derived class.
*
- * @param sender
- * @param appendEntries
+ * @param sender the ActorRef that sent the message
+ * @param appendEntries the message
* @return a new behavior if it was changed or the current behavior
*/
- protected RaftActorBehavior appendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ protected RaftActorBehavior appendEntries(ActorRef sender, AppendEntries appendEntries) {
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
- logName(), appendEntries.getTerm(), currentTerm());
- }
+ log.debug("{}: Cannot append entries because sender term {} is less than {}", logName(),
+ appendEntries.getTerm(), currentTerm());
- sender.tell(
- new AppendEntriesReply(context.getId(), currentTerm(), false,
- lastIndex(), lastTerm(), context.getPayloadVersion()), actor()
- );
+ sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
+ context.getPayloadVersion()), actor());
return this;
}
AppendEntriesReply appendEntriesReply);
/**
- * requestVote handles the RequestVote message. This logic is common
- * for all behaviors
+ * Handles the logic for the RequestVote message that is common for all behaviors.
*
- * @param sender
- * @param requestVote
+ * @param sender the ActorRef that sent the message
+ * @param requestVote the message
* @return a new behavior if it was changed or the current behavior
*/
protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) {
- LOG.debug("{}: In requestVote: {}", logName(), requestVote);
+ log.debug("{}: In requestVote: {}", logName(), requestVote);
boolean grantVote = canGrantVote(requestVote);
- if(grantVote) {
+ if (grantVote) {
context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId());
}
RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
- LOG.debug("{}: requestVote returning: {}", logName(), reply);
+ log.debug("{}: requestVote returning: {}", logName(), reply);
sender.tell(reply, actor());
return this;
}
- protected boolean canGrantVote(RequestVote requestVote){
+ protected boolean canGrantVote(RequestVote requestVote) {
boolean grantVote = false;
// Reply false if term < currentTerm (§5.1)
// more up-to-date.
if (requestVote.getLastLogTerm() > lastTerm()) {
candidateLatest = true;
- } else if ((requestVote.getLastLogTerm() == lastTerm())
+ } else if (requestVote.getLastLogTerm() == lastTerm()
&& requestVote.getLastLogIndex() >= lastIndex()) {
candidateLatest = true;
}
RequestVoteReply requestVoteReply);
/**
+ * Returns a duration for election with an additional variance for randomness.
*
* @return a random election duration
*/
}
/**
- * stop the scheduled election
+ * Stops the currently scheduled election.
*/
protected void stopElection() {
if (electionCancel != null && !electionCancel.isCancelled()) {
}
/**
- * schedule a new election
+ * Schedule a new election.
*
* @param interval the duration after which we should trigger a new election
*/
}
/**
+ * Returns the current election term.
+ *
* @return the current term
*/
protected long currentTerm() {
}
/**
+ * Returns the id of the candidate that this server voted for in current term.
+ *
* @return the candidate for whom we voted in the current term
*/
protected String votedFor() {
}
/**
- * @return the actor associated with this behavior
+ * Returns the actor associated with this behavior.
+ *
+ * @return the actor
*/
protected ActorRef actor() {
return context.getActor();
}
/**
+ * Returns the term of the last entry in the log.
*
- * @return the term from the last entry in the log
+ * @return the term
*/
protected long lastTerm() {
return context.getReplicatedLog().lastTerm();
}
/**
- * @return the index from the last entry in the log
+ * Returns the index of the last entry in the log.
+ *
+ * @return the index
*/
protected long lastIndex() {
return context.getReplicatedLog().lastIndex();
}
/**
- * @param logIndex
- * @return the client request tracker for the specified logIndex
+ * Removes and returns the ClientRequestTracker for the specified log index.
+ * @param logIndex the log index
+ * @return the ClientRequestTracker or null if none available
*/
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
return null;
}
/**
+ * Returns the actual index of the entry in replicated log for the given index or -1 if not found.
*
- * @return the log entry index for the given index or -1 if not found
+ * @return the log entry index or -1 if not found
*/
- protected long getLogEntryIndex(long index){
- if(index == context.getReplicatedLog().getSnapshotIndex()){
+ protected long getLogEntryIndex(long index) {
+ if (index == context.getReplicatedLog().getSnapshotIndex()) {
return context.getReplicatedLog().getSnapshotIndex();
}
ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
- if(entry != null){
+ if (entry != null) {
return entry.getIndex();
}
}
/**
- * @return the log entry term for the given index or -1 if not found
+ * Returns the actual term of the entry in replicated log for the given index or -1 if not found.
+ *
+ * @return the log entry term or -1 if not found
*/
- protected long getLogEntryTerm(long index){
- if(index == context.getReplicatedLog().getSnapshotIndex()){
+ protected long getLogEntryTerm(long index) {
+ if (index == context.getReplicatedLog().getSnapshotIndex()) {
return context.getReplicatedLog().getSnapshotTerm();
}
ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
- if(entry != null){
+ if (entry != null) {
return entry.getTerm();
}
}
/**
- * Apply the provided index to the state machine
+ * Applies the log entries up to the specified index that is known to be committed to the state machine.
*
- * @param index a log index that is known to be committed
+ * @param index the log index
*/
protected void applyLogToStateMachine(final long index) {
long newLastApplied = context.getLastApplied();
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
- LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+ log.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
logName(), i, i, index);
break;
}
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied);
- }
+
+ log.debug("{}: Setting last applied to {}", logName(), newLastApplied);
+
context.setLastApplied(newLastApplied);
// send a message to persist a ApplyLogEntries marker message into akka's persistent journal
return internalSwitchBehavior(createBehavior(context, newState));
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
- if(!context.getRaftPolicy().automaticElectionsEnabled()) {
+ if (!context.getRaftPolicy().automaticElectionsEnabled()) {
return this;
}
- LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
+ log.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
try {
close();
- } catch (Exception e) {
- LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
+ } catch (RuntimeException e) {
+ log.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
}
return newBehavior;
}
/**
- * Performs a snapshot with no capture on the replicated log.
- * It clears the log from the supplied index or last-applied-1 which ever is minimum.
+ * Performs a snapshot with no capture on the replicated log. It clears the log from the supplied index or
+ * lastApplied-1 which ever is minimum.
*
- * @param snapshotCapturedIndex
+ * @param snapshotCapturedIndex the index from which to clear
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex);
- if(actualIndex != -1){
+ if (actualIndex != -1) {
setReplicatedToAllIndex(actualIndex);
}
}
- protected String getId(){
+ protected String getId() {
return context.getId();
}
}
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
/**
- * The behavior of a RaftActor when it is in the CandidateState
+ * The behavior of a RaftActor when it is in the Candidate raft state.
* <p/>
* Candidates (§5.2):
* <ul>
public Candidate(RaftActorContext context) {
super(context, RaftState.Candidate);
- for(PeerInfo peer: context.getPeers()) {
- if(peer.isVoting()) {
+ for (PeerInfo peer: context.getPeers()) {
+ if (peer.isVoting()) {
votingPeers.add(peer.getId());
}
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
- }
+ log.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
votesRequired = getMajorityVoteCount(votingPeers.size());
startNewTerm();
- if(votingPeers.isEmpty()){
+ if (votingPeers.isEmpty()) {
actor().tell(ElectionTimeout.INSTANCE, actor());
} else {
scheduleElection(electionDuration());
}
@Override
- protected RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
- }
+ log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
// Some other candidate for the same term became a leader and sent us an append entry
- if(currentTerm() == appendEntries.getTerm()){
- LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
+ if (currentTerm() == appendEntries.getTerm()) {
+ log.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
logName(), currentTerm());
return switchBehavior(new Follower(context));
@Override
protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
- LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
+ log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
if (requestVoteReply.isVoteGranted()) {
voteCount++;
}
if (voteCount >= votesRequired) {
- if(context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
- LOG.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(),
+ if (context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
+ log.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(),
context.getReplicatedLog().lastIndex());
return internalSwitchBehavior(RaftState.PreLeader);
} else {
@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
if (message instanceof ElectionTimeout) {
- LOG.debug("{}: Received ElectionTimeout", logName());
+ log.debug("{}: Received ElectionTimeout", logName());
if (votesRequired == 0) {
// If there are no peers then we should be a Leader
RaftRPC rpc = (RaftRPC) message;
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
+ log.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
context.getTermInformation().getCurrentTerm());
- }
// If RPC request or response contains term T > currentTerm:
// set currentTerm = T, convert to follower (§5.1)
long newTerm = currentTerm + 1;
context.getTermInformation().updateAndPersist(newTerm, context.getId());
- LOG.debug("{}: Starting new term {}", logName(), newTerm);
+ log.debug("{}: Starting new term {}", logName(), newTerm);
// Request for a vote
// TODO: Retry request for vote if replies do not arrive in a reasonable
// amount of time TBD
for (String peerId : votingPeers) {
ActorSelection peerActor = context.getPeerActorSelection(peerId);
- if(peerActor != null) {
+ if (peerActor != null) {
RequestVote requestVote = new RequestVote(
context.getTermInformation().getCurrentTerm(),
context.getId(),
context.getReplicatedLog().lastIndex(),
context.getReplicatedLog().lastTerm());
- LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
+ log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
peerActor.tell(requestVote, context.getActor());
}
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
/**
- * The behavior of a RaftActor in the Follower state
+ * The behavior of a RaftActor in the Follower raft state.
* <p/>
* <ul>
* <li> Respond to RPCs from candidates and leaders
private final SyncStatusTracker initialSyncStatusTracker;
private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
- logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
+ logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
private SnapshotTracker snapshotTracker = null;
lastLeaderMessageTimer.start();
}
- private boolean isLogEntryPresent(long index){
- if(context.getReplicatedLog().isInSnapshot(index)) {
+ private boolean isLogEntryPresent(long index) {
+ if (context.getReplicatedLog().isInSnapshot(index)) {
return true;
}
}
- private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){
- initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex());
+ private void updateInitialSyncStatus(long currentLeaderCommit, String newLeaderId) {
+ initialSyncStatusTracker.update(newLeaderId, currentLeaderCommit, context.getCommitIndex());
}
@Override
protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
- } else if(LOG.isDebugEnabled() && numLogEntries > 0) {
- LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+ if (log.isTraceEnabled()) {
+ log.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
+ } else if (log.isDebugEnabled() && numLogEntries > 0) {
+ log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
}
// TODO : Refactor this method into a bunch of smaller methods
// cover the code properly
if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
- LOG.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " +
- "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
+ log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the "
+ + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
snapshotTracker = null;
}
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm(), context.getPayloadVersion());
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
- }
+ log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
sender.tell(reply, actor());
return this;
// We found that the log was out of sync so just send a negative
// reply and return
- LOG.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}",
+ log.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}",
logName(), lastIndex, lastTerm());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
- LOG.debug("{}: Number of entries to be appended = {}", logName(),
+ log.debug("{}: Number of entries to be appended = {}", logName(),
appendEntries.getEntries().size());
// 3. If an existing entry conflicts with a new one (same index
for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
- if(!isLogEntryPresent(matchEntry.getIndex())) {
+ if (!isLogEntryPresent(matchEntry.getIndex())) {
// newEntry not found in the log
break;
}
long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex());
- LOG.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry,
+ log.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry,
existingEntryTerm);
// existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know
// what the term was so we'll assume it matches.
- if(existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) {
+ if (existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) {
continue;
}
- if(!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
+ if (!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
- LOG.debug("{}: Removing entries from log starting at {}", logName(),
+ log.debug("{}: Removing entries from log starting at {}", logName(),
matchEntry.getIndex());
// Entries do not match so remove all subsequent entries
- if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
+ if (!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
// Could not remove the entries - this means the matchEntry index must be in the
// snapshot and not the log. In this case the prior entries are part of the state
// so we must send back a reply to force a snapshot to completely re-sync the
// follower's log and state.
- LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName());
+ log.debug("{}: Could not remove entries - sending reply to force snapshot", logName());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
lastTerm(), context.getPayloadVersion(), true), actor());
return this;
}
lastIndex = lastIndex();
- LOG.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
+ log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
lastIndex, addEntriesFrom);
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
- LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
+ log.debug("{}: Append entry to log {}", logName(), entry.getData());
context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
- if(entry.getData() instanceof ServerConfigurationPayload) {
+ if (entry.getData() instanceof ServerConfigurationPayload) {
context.updatePeerIds((ServerConfigurationPayload)entry.getData());
}
}
- LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
+ log.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
}
// 5. If leaderCommit > commitIndex, set commitIndex =
lastIndex = lastIndex();
long prevCommitIndex = context.getCommitIndex();
- if(appendEntries.getLeaderCommit() > prevCommitIndex) {
+ if (appendEntries.getLeaderCommit() > prevCommitIndex) {
context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
}
if (prevCommitIndex != context.getCommitIndex()) {
- LOG.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
+ log.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
}
// If commitIndex > lastApplied: increment lastApplied, apply
// log[lastApplied] to state machine (§5.3)
// check if there are any entries to be applied. last-applied can be equal to last-index
- if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
- context.getLastApplied() < lastIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: applyLogToStateMachine, " +
- "appendEntries.getLeaderCommit(): {}," +
- "context.getLastApplied(): {}, lastIndex(): {}", logName(),
+ if (appendEntries.getLeaderCommit() > context.getLastApplied()
+ && context.getLastApplied() < lastIndex) {
+ if (log.isDebugEnabled()) {
+ log.debug("{}: applyLogToStateMachine, appendEntries.getLeaderCommit(): {},"
+ + "context.getLastApplied(): {}, lastIndex(): {}", logName(),
appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex);
}
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex, lastTerm(), context.getPayloadVersion());
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply);
- } else if(LOG.isDebugEnabled() && numLogEntries > 0) {
- LOG.debug("{}: handleAppendEntries returning : {}", logName(), reply);
+ if (log.isTraceEnabled()) {
+ log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
+ } else if (log.isDebugEnabled() && numLogEntries > 0) {
+ log.debug("{}: handleAppendEntries returning : {}", logName(), reply);
}
sender.tell(reply, actor());
// an entry at prevLogIndex and this follower has no entries in
// it's log.
- LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+ log.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
logName(), appendEntries.getPrevLogIndex());
} else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, snapshotIndex: {}",
- logName(), appendEntries.getPrevLogIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
+ log.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - "
+ + "lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
+ context.getReplicatedLog().getSnapshotIndex());
} else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- LOG.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries " +
- "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(),
- prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
- } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
+ log.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries"
+ + "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(),
+ prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex,
+ context.getReplicatedLog().getSnapshotIndex());
+ } else if (appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
&& !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
- LOG.debug(
- "{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
- logName(), appendEntries.getReplicatedToAllIndex());
- } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
+ log.debug("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the"
+ + " in-memory journal", logName(), appendEntries.getReplicatedToAllIndex());
+ } else if (appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
&& !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) {
- LOG.debug(
- "{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
- logName(), appendEntries.getEntries().get(0).getIndex() - 1);
+ log.debug("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
+ + " in-memory journal", logName(), appendEntries.getEntries().get(0).getIndex() - 1);
} else {
outOfSync = false;
}
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+ log.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
// lastLeaderMessageTimer.
long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis();
- boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() ||
- lastLeaderMessageInterval >= electionTimeoutInMillis;
+ boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning()
+ || lastLeaderMessageInterval >= electionTimeoutInMillis;
- if(canStartElection()) {
- if(message instanceof TimeoutNow) {
- LOG.debug("{}: Received TimeoutNow - switching to Candidate", logName());
+ if (canStartElection()) {
+ if (message instanceof TimeoutNow) {
+ log.debug("{}: Received TimeoutNow - switching to Candidate", logName());
return internalSwitchBehavior(RaftState.Candidate);
- } else if(noLeaderMessageReceived) {
+ } else if (noLeaderMessageReceived) {
// Check the cluster state to see if the leader is known to be up before we go to Candidate.
// However if we haven't heard from the leader in a long time even though the cluster state
// indicates it's up then something is wrong - leader might be stuck indefinitely - so switch
// to Candidate,
long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR;
- if(isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
- LOG.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
+ if (isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
+ log.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
scheduleElection(electionDuration());
} else {
- LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+ log.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
return internalSwitchBehavior(RaftState.Candidate);
}
} else {
- LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
+ log.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval());
scheduleElection(electionDuration());
}
- } else if(message instanceof ElectionTimeout) {
- if(noLeaderMessageReceived) {
+ } else if (message instanceof ElectionTimeout) {
+ if (noLeaderMessageReceived) {
setLeaderId(null);
}
}
private boolean isLeaderAvailabilityKnown() {
- if(leaderId == null) {
+ if (leaderId == null) {
return false;
}
Optional<Cluster> cluster = context.getCluster();
- if(!cluster.isPresent()) {
+ if (!cluster.isPresent()) {
return false;
}
ActorSelection leaderActor = context.getPeerActorSelection(leaderId);
- if(leaderActor == null) {
+ if (leaderActor == null) {
return false;
}
CurrentClusterState state = cluster.get().state();
Set<Member> unreachable = state.getUnreachable();
- LOG.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
+ log.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
unreachable);
- for(Member m: unreachable) {
- if(leaderAddress.equals(m.address())) {
- LOG.info("{}: Leader {} is unreachable", logName(), leaderAddress);
+ for (Member m: unreachable) {
+ if (leaderAddress.equals(m.address())) {
+ log.info("{}: Leader {} is unreachable", logName(), leaderAddress);
return false;
}
}
- for(Member m: state.getMembers()) {
- if(leaderAddress.equals(m.address())) {
- if(m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
- LOG.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
+ for (Member m: state.getMembers()) {
+ if (leaderAddress.equals(m.address())) {
+ if (m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
+ log.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
leaderAddress, m.status());
return true;
} else {
- LOG.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
+ log.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
leaderAddress, m.status());
return false;
}
}
}
- LOG.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
+ log.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
return false;
}
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
- LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
+ log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
leaderId = installSnapshot.getLeaderId();
- if(snapshotTracker == null){
- snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
+ if (snapshotTracker == null) {
+ snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
}
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
final InstallSnapshotReply reply = new InstallSnapshotReply(
currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
- if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
- installSnapshot.getLastChunkHashCode())){
+ if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+ installSnapshot.getLastChunkHashCode())) {
Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
new ArrayList<>(),
installSnapshot.getLastIncludedIndex(),
ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
@Override
public void onSuccess() {
- LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ log.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
sender.tell(reply, actor());
}
snapshotTracker = null;
} else {
- LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ log.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
sender.tell(reply, actor());
}
} catch (SnapshotTracker.InvalidChunkException e) {
- LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
+ log.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
-1, false), actor());
snapshotTracker = null;
- } catch (Exception e){
- LOG.error("{}: Exception in InstallSnapshot of follower", logName(), e);
-
- //send reply with success as false. The chunk will be sent again on failure
- sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
- installSnapshot.getChunkIndex(), false), actor());
-
}
}
}
@VisibleForTesting
- SnapshotTracker getSnapshotTracker(){
+ SnapshotTracker getSnapshotTracker() {
return snapshotTracker;
}
}
// it can happen that this isolated leader interacts with a new leader in the cluster and
// changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated
if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) {
- LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId());
+ log.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId());
return internalSwitchBehavior(new Leader(context, this));
}
return ret;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
/**
- * The behavior of a RaftActor when it is in the Leader state
+ * The behavior of a RaftActor when it is in the Leader state.
* <p/>
* Leaders:
* <ul>
* respond after entry applied to state machine (§5.3)
* <li> If last log index ≥ nextIndex for a follower: send
* AppendEntries RPC with log entries starting at nextIndex
- * <ul>
* <li> If successful: update nextIndex and matchIndex for
* follower (§5.3)
* <li> If AppendEntries fails because of log inconsistency:
* decrement nextIndex and retry (§5.3)
- * </ul>
* <li> If there exists an N such that N > commitIndex, a majority
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
+ * </ul>
*/
public class Leader extends AbstractLeader {
/**
static final Object ISOLATED_LEADER_CHECK = new Object();
private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
- private @Nullable LeadershipTransferContext leadershipTransferContext;
+ @Nullable private LeadershipTransferContext leadershipTransferContext;
Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
super(context, RaftState.Leader, initializeFromLeader);
if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
if (isLeaderIsolated()) {
- LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
return internalSwitchBehavior(new IsolatedLeader(context, this));
} else {
}
@Override
- protected void beforeSendHeartbeat(){
- if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+ protected void beforeSendHeartbeat() {
+ if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
+ > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
isolatedLeaderCheck.reset().start();
}
- if(leadershipTransferContext != null && leadershipTransferContext.isExpired(
+ if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
- LOG.debug("{}: Leadership transfer expired", logName());
+ log.debug("{}: Leadership transfer expired", logName());
leadershipTransferContext = null;
}
}
* {@link RaftActorLeadershipTransferCohort#abortTtransfer}.</li>
* </ul>
*
- * @param leadershipTransferCohort
+ * @param leadershipTransferCohort the cohort participating in the leadership transfer
*/
public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
- LOG.debug("{}: Attempting to transfer leadership", logName());
+ log.debug("{}: Attempting to transfer leadership", logName());
leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
}
private void tryToCompleteLeadershipTransfer(String followerId) {
- if(leadershipTransferContext == null) {
+ if (leadershipTransferContext == null) {
return;
}
FollowerLogInformation followerInfo = getFollower(followerId);
- if(followerInfo == null) {
+ if (followerInfo == null) {
return;
}
long lastIndex = context.getReplicatedLog().lastIndex();
boolean isVoting = context.getPeerInfo(followerId).isVoting();
- LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
+ log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
- if(isVoting && followerInfo.getMatchIndex() == lastIndex) {
- LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
+ if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
+ log.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
// We can't be sure if the follower has applied all its log entries to its state so send an
// additional AppendEntries with the latest commit index.
ActorSelection followerActor = context.getPeerActorSelection(followerId);
followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
- LOG.debug("{}: Leader transfer complete", logName());
+ log.debug("{}: Leader transfer complete", logName());
leadershipTransferContext.transferCohort.transferComplete();
leadershipTransferContext = null;
@Override
public void close() {
- if(leadershipTransferContext != null) {
+ if (leadershipTransferContext != null) {
LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
leadershipTransferContext = null;
localLeadershipTransferContext.transferCohort.abortTransfer();
}
boolean isExpired(long timeout) {
- if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
+ if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
transferCohort.abortTransfer();
return true;
}
* the log with the leader's current term. Once the no-op entry is committed, all prior entries are committed
* indirectly. Once all entries are committed, ie commitIndex matches the last log index, it switches to the
* normal Leader state.
- * <p>
+ * <p/>
* The use of a no-op entry in this manner is outlined in the last paragraph in §8 of the
* <a href="https://raft.github.io/raft.pdf">extended raft version</a>.
*
@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
if (message instanceof ApplyState) {
- if(context.getLastApplied() >= context.getReplicatedLog().lastIndex()) {
+ if (context.getLastApplied() >= context.getReplicatedLog().lastIndex()) {
// We've applied all entries - we can switch to Leader.
return internalSwitchBehavior(new Leader(context, this));
} else {
import org.opendaylight.controller.cluster.raft.RaftState;
/**
- * A RaftActorBehavior represents the specific behavior of a RaftActor
- * <p>
- * A RaftActor can behave as one of the following,
- * <ul>
- * <li> Follower </li>
- * <li> Candidate </li>
- * <li> Leader </li>
- * </ul>
- * <p>
- * In each of these behaviors the Raft Actor handles the same Raft messages
- * differently.
+ * The interface for a class that implements a specific behavior of a RaftActor. The types of behaviors are enumerated
+ * by {@link RaftState}. Each handles the same Raft messages differently.
*/
-public interface RaftActorBehavior extends AutoCloseable{
+public interface RaftActorBehavior extends AutoCloseable {
/**
* Handle a message. If the processing of the message warrants a state
*
* @return The new behavior or current behavior, or null if the message was not handled.
*/
- @Nullable RaftActorBehavior handleMessage(ActorRef sender, Object message);
+ @Nullable
+ RaftActorBehavior handleMessage(ActorRef sender, Object message);
/**
+ * Returns the state associated with this behavior.
*
- * @return The state associated with a given behavior
+ * @return the RaftState
*/
RaftState state();
/**
+ * Returns the id of the leader.
*
- * @return The Id of the Leader if known else null
+ * @return the id of the leader or null if not known
*/
+ @Nullable
String getLeaderId();
/**
- * setting the index of the log entry which is replicated to all nodes
- * @param replicatedToAllIndex
+ * Sets the index of the last log entry that has been replicated to all peers.
+ *
+ * @param replicatedToAllIndex the index
*/
void setReplicatedToAllIndex(long replicatedToAllIndex);
/**
- * @return the index of the log entry which is replicated to all nodes
+ * Returns the index of the last log entry that has been replicated to all peers.
+ *
+ * @return the index or -1 if not known
*/
long getReplicatedToAllIndex();
/**
- * @return the leader's payload data version.
+ * Returns the leader's payload data version.
+ *
+ * @return a short representing the version
*/
short getLeaderPayloadVersion();
/**
- * switchBehavior makes sure that the current behavior is shutdown before it switches to the new
- * behavior
+ * Closes the current behavior and switches to the specified behavior, if possible.
*
- * @param behavior The new behavior to switch to
- * @return The new behavior
+ * @param behavior the new behavior to switch to
+ * @return the new behavior
*/
RaftActorBehavior switchBehavior(RaftActorBehavior behavior);
import org.slf4j.Logger;
/**
- * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
+ * Helper class that maintains state for a snapshot that is being installed in chunks on a Follower.
*/
public class SnapshotTracker {
- private final Logger LOG;
+ private final Logger log;
private final int totalChunks;
private final String leaderId;
private ByteString collectedChunks = ByteString.EMPTY;
private boolean sealed = false;
private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
- SnapshotTracker(Logger LOG, int totalChunks, String leaderId) {
- this.LOG = LOG;
+ SnapshotTracker(Logger log, int totalChunks, String leaderId) {
+ this.log = log;
this.totalChunks = totalChunks;
this.leaderId = Preconditions.checkNotNull(leaderId);
}
/**
- * Adds a chunk to the tracker
+ * Adds a chunk to the tracker.
*
- * @param chunkIndex
- * @param chunk
- * @return true when the lastChunk is received
- * @throws InvalidChunkException
+ * @param chunkIndex the index of the chunk
+ * @param chunk the chunk data
+ * @param lastChunkHashCode the optional hash code for the chunk
+ * @return true if this is the last chunk is received
+ * @throws InvalidChunkException if the chunk index is invalid or out of order
*/
- boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
- LOG.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
+ boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> maybeLastChunkHashCode)
+ throws InvalidChunkException {
+ log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
chunkIndex, lastChunkIndex, collectedChunks.size(), this.lastChunkHashCode);
- if(sealed){
- throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
+ if (sealed) {
+ throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex
+ + " all chunks already received");
}
- if(lastChunkIndex + 1 != chunkIndex){
+ if (lastChunkIndex + 1 != chunkIndex) {
throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
}
- if(lastChunkHashCode.isPresent()){
- if(lastChunkHashCode.get() != this.lastChunkHashCode){
- throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
- "the senders hash code, expected " + this.lastChunkHashCode + " was " + lastChunkHashCode.get());
- }
+ if (maybeLastChunkHashCode.isPresent() && maybeLastChunkHashCode.get() != this.lastChunkHashCode) {
+ throw new InvalidChunkException("The hash code of the recorded last chunk does not match "
+ + "the senders hash code, expected " + this.lastChunkHashCode + " was "
+ + maybeLastChunkHashCode.get());
}
sealed = chunkIndex == totalChunks;
return sealed;
}
- byte[] getSnapshot(){
- if(!sealed) {
+ byte[] getSnapshot() {
+ if (!sealed) {
throw new IllegalStateException("lastChunk not received yet");
}
return collectedChunks.toByteArray();
}
- ByteString getCollectedChunks(){
+ ByteString getCollectedChunks() {
return collectedChunks;
}
public static class InvalidChunkException extends Exception {
private static final long serialVersionUID = 1L;
- InvalidChunkException(String message){
+ InvalidChunkException(String message) {
super(message);
}
}
-
}
this.syncThreshold = syncThreshold;
}
- public void update(String leaderId, long leaderCommit, long commitIndex){
- leaderId = Preconditions.checkNotNull(leaderId, "leaderId should not be null");
+ public void update(String leaderId, long leaderCommit, long commitIndex) {
+ Preconditions.checkNotNull(leaderId, "leaderId should not be null");
- if(!leaderId.equals(syncedLeaderId)){
+ if (!leaderId.equals(syncedLeaderId)) {
minimumExpectedIndex = leaderCommit;
changeSyncStatus(NOT_IN_SYNC, FORCE_STATUS_CHANGE);
syncedLeaderId = leaderId;
return;
}
- if((leaderCommit - commitIndex) > syncThreshold){
+ if (leaderCommit - commitIndex > syncThreshold) {
changeSyncStatus(NOT_IN_SYNC);
- } else if((leaderCommit - commitIndex) <= syncThreshold && commitIndex >= minimumExpectedIndex) {
+ } else if (leaderCommit - commitIndex <= syncThreshold && commitIndex >= minimumExpectedIndex) {
changeSyncStatus(IN_SYNC);
}
}
- private void changeSyncStatus(boolean newSyncStatus){
+ private void changeSyncStatus(boolean newSyncStatus) {
changeSyncStatus(newSyncStatus, !FORCE_STATUS_CHANGE);
}
- private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange){
- if(syncStatus == newSyncStatus && !forceStatusChange){
+ private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange) {
+ if (syncStatus == newSyncStatus && !forceStatusChange) {
return;
}
actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender());
syncStatus = newSyncStatus;
}
-}
\ No newline at end of file
+}
* Request to locate the leader raft actor. Each {@link org.opendaylight.controller.cluster.raft.RaftActor} must
* respond with a {@link FindLeaderReply} containing the address of the leader, as it is known to that particular
* actor.
- *
+ * <p/>
* This message is intended for testing purposes only.
*/
@VisibleForTesting
* Reply to {@link FindLeader} message, containing the address of the leader actor, as known to the raft actor which
* sent the message. If the responding actor does not have knowledge of the leader, {@link #getLeaderActor()} will
* return {@link Optional#empty()}.
- *
+ * <p/>
* This message is intended for testing purposes only.
*/
@VisibleForTesting
import java.util.Map;
/**
- * The response to a GetOnDemandRaftState message,
+ * The response to a GetOnDemandRaftState message.
*
* @author Thomas Pantelis
*/
// term
private long term;
- protected AbstractRaftRPC(long term){
+ protected AbstractRaftRPC(long term) {
this.term = term;
}
// term of prevLogIndex entry
private final long prevLogTerm;
- // log entries to store (empty for heartbeat;
- // may send more than one for efficiency)
+ // log entries to store (empty for heart beat - may send more than one for efficiency)
private transient List<ReplicatedLogEntry> entries;
// leader's commitIndex
private AppendEntries appendEntries;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
}
out.writeShort(appendEntries.payloadVersion);
out.writeInt(appendEntries.entries.size());
- for(ReplicatedLogEntry e: appendEntries.entries) {
+ for (ReplicatedLogEntry e: appendEntries.entries) {
out.writeLong(e.getIndex());
out.writeLong(e.getTerm());
out.writeObject(e.getData());
int size = in.readInt();
List<ReplicatedLogEntry> entries = new ArrayList<>(size);
- for(int i = 0; i < size; i++) {
+ for (int i = 0; i < size; i++) {
entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
}
import org.opendaylight.controller.cluster.raft.RaftVersions;
/**
- * Reply for the AppendEntriesRpc message
+ * Reply for the AppendEntries message.
*/
public class AppendEntriesReply extends AbstractRaftRPC {
private static final long serialVersionUID = -7487547356392536683L;
private AppendEntriesReply appendEntriesReply;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
}
import java.io.ObjectOutput;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+/**
+ * Message sent from a leader to install a snapshot chunk on a follower.
+ */
public class InstallSnapshot extends AbstractRaftRPC {
private static final long serialVersionUID = 1L;
private final Optional<ServerConfigurationPayload> serverConfig;
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, byte[] data,
- int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode, Optional<ServerConfigurationPayload> serverConfig) {
+ int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode,
+ Optional<ServerConfigurationPayload> serverConfig) {
super(term);
this.leaderId = leaderId;
this.lastIncludedIndex = lastIncludedIndex;
private InstallSnapshot installSnapshot;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
}
out.writeInt(installSnapshot.totalChunks);
out.writeByte(installSnapshot.lastChunkHashCode.isPresent() ? 1 : 0);
- if(installSnapshot.lastChunkHashCode.isPresent()) {
+ if (installSnapshot.lastChunkHashCode.isPresent()) {
out.writeInt(installSnapshot.lastChunkHashCode.get().intValue());
}
out.writeByte(installSnapshot.serverConfig.isPresent() ? 1 : 0);
- if(installSnapshot.serverConfig.isPresent()) {
+ if (installSnapshot.serverConfig.isPresent()) {
out.writeObject(installSnapshot.serverConfig.get());
}
Optional<Integer> lastChunkHashCode = Optional.absent();
boolean chunkHashCodePresent = in.readByte() == 1;
- if(chunkHashCodePresent) {
+ if (chunkHashCodePresent) {
lastChunkHashCode = Optional.of(in.readInt());
}
Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
boolean serverConfigPresent = in.readByte() == 1;
- if(serverConfigPresent) {
+ if (serverConfigPresent) {
serverConfig = Optional.of((ServerConfigurationPayload)in.readObject());
}
private InstallSnapshotReply installSnapshotReply;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
}
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("RequestVoteReply [term=").append(getTerm()).append(", voteGranted=").append(voteGranted).append("]");
- return builder.toString();
+ return "RequestVoteReply [term=" + getTerm() + ", voteGranted=" + voteGranted + "]";
}
}
@Override
public String toString() {
- return "ServerRemoved{" +
- "serverId='" + serverId + '\'' +
- '}';
+ return "ServerRemoved [serverId=" + serverId + "]";
}
}
package org.opendaylight.controller.cluster.raft.messages;
/**
- * Local message sent to self on receiving InstallSnapshotReply from a follower, this message indicates that
- * the catchup of the follower is done succesfully during AddServer scenario
+ * Local message sent to self on receiving the InstallSnapshotReply from a follower indicating that
+ * the catch up of the follower has completed successfully for an AddServer operation.
*/
public class UnInitializedFollowerSnapshotReply {
private final String followerId;
- public UnInitializedFollowerSnapshotReply(String followerId){
- this.followerId = followerId;
+ public UnInitializedFollowerSnapshotReply(String followerId) {
+ this.followerId = followerId;
}
public String getFollowerId() {
private ApplyJournalEntries applyEntries;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
// For Externalizable
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
out.writeLong(applyEntries.toIndex);
- }
+ }
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
private DeleteEntries deleteEntries;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
// For Externalizable
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
out.writeLong(deleteEntries.fromIndex);
- }
+ }
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
private List<ServerInfo> serverConfig;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
// For Externalizable
}
out.writeObject(i.getId());
out.writeBoolean(i.isVoting());
}
- }
+ }
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
private UpdateElectionTerm updateElectionTerm;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
// For Externalizable
}
public void writeExternal(final ObjectOutput out) throws IOException {
out.writeLong(updateElectionTerm.currentTerm);
out.writeObject(updateElectionTerm.votedFor);
- }
+ }
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
* we may want to be able to determine which Raft replica should become the leader - with Raft elections are
* randomized so it is not possible to specify which replica should be the leader. The ability to specify
* the leader would be quite useful when testing a raft cluster.
- *
+ * <p/>
* Similarly we may want to customize when exactly we apply a modification to the state - with Raft a modification
* is only applied to the state when the modification is replicated to a majority of the replicas. The ability to
* apply a modification to the state before consensus would be useful in scenarios where you have only 2 nodes
/**
* According to Raft consensus on a Raft entry is achieved only after a Leader replicates a log entry to a
- * majority of it's followers
+ * majority of it's followers.
*
* @return true if modification should be applied before consensus, false to apply modification to state
- * as per Raft
+ * as per Raft
*/
boolean applyModificationToStateBeforeConsensus();
}