final @Nullable AbstractLeader initializeFromLeader) {
super(context, state);
- appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
+ appendEntriesMessageSlicer = MessageSlicer.builder()
+ .logContext(logName)
.messageSliceSize(context.getConfigParams().getMaximumMessageSliceSize())
- .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3,
- TimeUnit.MILLISECONDS).build();
+ .expireStateAfterInactivity(
+ context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS)
+ .build();
if (initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
}
}
- log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
+ log.debug("{}: Election: Leader has following peers: {}", logName, getFollowerIds());
updateMinReplicaCount();
}
@Override
- protected RaftActorBehavior handleAppendEntries(final ActorRef sender,
- final AppendEntries appendEntries) {
-
- log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
-
+ final RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
+ log.debug("{}: handleAppendEntries: {}", logName, appendEntries);
return this;
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
- final AppendEntriesReply appendEntriesReply) {
- log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
+ RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) {
+ log.trace("{}: handleAppendEntriesReply: {}", logName, appendEntriesReply);
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
if (followerLogInformation == null) {
- log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
+ log.error("{}: handleAppendEntriesReply - unknown follower {}", logName, followerId);
return this;
}
final var followerRaftVersion = appendEntriesReply.getRaftVersion();
if (followerRaftVersion < RaftVersions.FLUORINE_VERSION) {
- log.warn("{}: handleAppendEntriesReply - ignoring reply from follower {} raft version {}", logName(),
+ log.warn("{}: handleAppendEntriesReply - ignoring reply from follower {} raft version {}", logName,
followerId, followerRaftVersion);
return this;
}
if (lastActivityNanos > context.getConfigParams().getElectionTimeOutInterval().toNanos()) {
log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
+ "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
- logName(), appendEntriesReply, TimeUnit.NANOSECONDS.toMillis(lastActivityNanos),
+ logName, appendEntriesReply, TimeUnit.NANOSECONDS.toMillis(lastActivityNanos),
context.getLastApplied(), context.getCommitIndex());
}
// However in this case the log terms won't match and the logs will conflict - this is handled
// elsewhere.
log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
- + "(snapshotIndex {}, snapshotTerm {}) - forcing install snaphot", logName(),
+ + "(snapshotIndex {}, snapshotTerm {}) - forcing install snaphot", logName,
followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
context.getReplicatedLog().lastIndex(), context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm());
updated = true;
log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
- + "leader's {} - set the follower's next index to {}", logName(),
+ + "leader's {} - set the follower's next index to {}", logName,
followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
} else {
}
} else {
log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}, "
- + "snapshotTerm: {}, replicatedToAllIndex: {}", logName(), appendEntriesReply,
+ + "snapshotTerm: {}, replicatedToAllIndex: {}", logName, appendEntriesReply,
context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(),
getReplicatedToAllIndex());
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
- + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+ + "updated: matchIndex: {}, nextIndex: {}", logName, followerId,
followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
} else {
// The follower's log conflicts with leader's log so decrement follower's next index
updated = true;
log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
- logName(), followerId, appendEntriesReply.getLogLastTerm(),
+ logName, followerId, appendEntriesReply.getLogLastTerm(),
followersLastLogTermInLeadersLogOrSnapshot, followerLogInformation.getNextIndex());
}
}
if (log.isTraceEnabled()) {
log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
- logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
+ logName, followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
}
possiblyUpdateCommitIndex();
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
if (replicatedLogEntry == null) {
log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
- logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+ logName, index, context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().size());
break;
}
break;
}
- log.trace("{}: checking Nth index {}", logName(), index);
+ log.trace("{}: checking Nth index {}", logName, index);
for (FollowerLogInformation info : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(info.getId());
if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) {
replicatedCount++;
} else if (log.isTraceEnabled()) {
- log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+ log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName, info.getId(),
info.getMatchIndex(), peerInfo);
}
}
if (log.isTraceEnabled()) {
- log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount,
+ log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName, replicatedCount,
minReplicationCount);
}
// reach consensus, as per §5.4.1: "once an entry from the current term is committed by
// counting replicas, then all prior entries are committed indirectly".
if (replicatedLogEntry.term() == currentTerm()) {
- log.trace("{}: Setting commit index to {}", logName(), index);
+ log.trace("{}: Setting commit index to {}", logName, index);
context.setCommitIndex(index);
} else {
log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, "
- + "term {} does not match the current term {}", logName(), index,
+ + "term {} does not match the current term {}", logName, index,
replicatedLogEntry.index(), replicatedLogEntry.term(), currentTerm());
}
} else {
- log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
+ log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName, replicatedCount);
break;
}
}
// Apply the change to the state machine
if (context.getCommitIndex() > context.getLastApplied()) {
- log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+ log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName,
context.getCommitIndex(), context.getLastApplied());
applyLogToStateMachine(context.getCommitIndex());
if (updated && log.isDebugEnabled()) {
log.debug(
"{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
- logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
+ logName, followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
followerLogInformation.getNextIndex());
}
return updated;
}
@Override
- protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
+ RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
return this;
}
&& shouldUpdateTerm(rpc)) {
log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
- logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+ logName, rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
// of other nodes available that can elect the requesting candidate. Since we're transferring
// leadership, we should make every effort to get the requesting node elected.
if (rpc instanceof RequestVote requestVote && context.getRaftActorLeadershipTransferCohort() != null) {
- log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+ log.debug("{}: Leadership transfer in progress - processing RequestVote", logName);
requestVote(sender, requestVote);
}
}
private void handleInstallSnapshotReply(final InstallSnapshotReply reply) {
- log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
+ log.debug("{}: handleInstallSnapshotReply: {}", logName, reply);
final var followerId = reply.getFollowerId();
final var followerLogInfo = followerToLog.get(followerId);
if (followerLogInfo == null) {
// This can happen during AddServer if it times out.
- log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", logName(),
+ log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", logName,
followerId);
return;
}
final var installSnapshotState = followerLogInfo.getInstallSnapshotState();
if (installSnapshotState == null) {
- log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", logName(),
+ log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", logName,
followerId);
return;
}
final var replyChunkIndex = reply.getChunkIndex();
if (replyChunkIndex != expectedChunkIndex) {
log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
- logName(), replyChunkIndex, followerId, expectedChunkIndex);
+ logName, replyChunkIndex, followerId, expectedChunkIndex);
if (replyChunkIndex == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) {
// Since the Follower did not find this index to be valid we should reset the follower snapshot
}
if (!reply.isSuccess()) {
- log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
+ log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName, reply);
installSnapshotState.markSendStatus(false);
sendNextSnapshotChunk(followerId, followerLogInfo);
return;
}
if (!installSnapshotState.isLastChunk(replyChunkIndex)) {
- log.debug("{}: Success InstallSnapshotReply from {}, sending next chunk", logName(), followerId);
+ log.debug("{}: Success InstallSnapshotReply from {}, sending next chunk", logName, followerId);
installSnapshotState.markSendStatus(true);
sendNextSnapshotChunk(followerId, followerLogInfo);
return;
followerLogInfo.clearLeaderInstallSnapshotState();
log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - matchIndex set to {}, "
- + "nextIndex set to {}", logName(), followerId, replyChunkIndex, followerLogInfo.getMatchIndex(),
+ + "nextIndex set to {}", logName, followerId, replyChunkIndex, followerLogInfo.getMatchIndex(),
followerLogInfo.getNextIndex());
if (!anyFollowersInstallingSnapshot()) {
private void replicate(final Replicate replicate) {
final long logIndex = replicate.logIndex();
- log.debug("{}: Replicate message: identifier: {}, logIndex: {}, isSendImmediate: {}", logName(),
+ log.debug("{}: Replicate message: identifier: {}, logIndex: {}, isSendImmediate: {}", logName,
replicate.identifier(), logIndex, replicate.sendImmediate());
// Create a tracker entry we will use this later to notify the
if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) {
log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, "
- + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive,
+ + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName, followerId, isFollowerActive,
followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
}
if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
- log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
+ log.debug("{}: sendAppendEntries: {} is present for follower {}", logName,
followerNextIndex, followerId);
if (followerLogInformation.okToReplicate(context.getCommitIndex())) {
sendAppendEntries = true;
if (canInstallSnapshot(followerNextIndex)) {
log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
- + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+ + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName, followerId,
followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
context.getReplicatedLog().size());
// It doesn't seem like we should ever reach here - most likely indicates sonething is
// wrong.
log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
- + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+ + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName,
followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
context.getReplicatedLog().size());
}
}
final var firstEntry = entries.get(0);
- log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), firstEntry.getData().size(),
+ log.debug("{}: Log entry size {} exceeds max payload size {}", logName, firstEntry.getData().size(),
maxDataSize);
// If an AppendEntries has already been serialized for the log index then reuse the
getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries,
context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion());
- log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries,
+ log.debug("{}: Serializing {} for slicing for follower {}", logName, appendEntries,
followerLogInfo.getId());
try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
out.writeObject(appendEntries);
} catch (IOException e) {
- log.error("{}: Error serializing {}", logName(), appendEntries, e);
+ log.error("{}: Error serializing {}", logName, appendEntries, e);
fileBackedStream.cleanup();
return List.of();
}
sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream);
fileBackedStream.setOnCleanupCallback(index -> {
- log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index);
+ log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName, index);
sharedSerializedAppendEntriesStreams.remove(index);
}, logIndex);
} else {
- log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId());
+ log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName, followerLogInfo.getId());
fileBackedStream.incrementUsageCount();
}
- log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId());
+ log.debug("{}: Slicing stream for index {}, follower {}", logName, logIndex, followerLogInfo.getId());
// Record that slicing is in progress for the follower.
followerLogInfo.setSlicedLogEntryIndex(logIndex);
appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier)
.fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor())
.onFailureCallback(failure -> {
- log.error("{}: Error slicing AppendEntries for follower {}", logName(),
+ log.error("{}: Error slicing AppendEntries for follower {}", logName,
followerLogInfo.getId(), failure);
followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX);
}).build());
followerLogInformation.getRaftVersion(), followerLogInformation.needsLeaderAddress(getId()));
if (!entries.isEmpty() || log.isTraceEnabled()) {
- log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
+ log.debug("{}: Sending AppendEntries to follower {}: {}", logName, followerLogInformation.getId(),
appendEntries);
}
getReplicatedToAllIndex(), followerId);
if (captureInitiated) {
followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
- context.getConfigParams().getMaximumMessageSliceSize(), logName()));
+ context.getConfigParams().getMaximumMessageSliceSize(), logName));
}
return captureInitiated;
private void sendInstallSnapshot() {
- log.debug("{}: sendInstallSnapshot", logName());
+ log.debug("{}: sendInstallSnapshot", logName);
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
String followerId = e.getKey();
ActorSelection followerActor = context.getPeerActorSelection(followerId);
LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
if (installSnapshotState == null) {
installSnapshotState = new LeaderInstallSnapshotState(
- context.getConfigParams().getMaximumMessageSliceSize(), logName());
+ context.getConfigParams().getMaximumMessageSliceSize(), logName);
followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
}
byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
- log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+ log.debug("{}: next snapshot chunk size for follower {}: {}", logName, followerLogInfo.getId(),
nextSnapshotChunk.length);
int nextChunkIndex = installSnapshotState.incrementChunkIndex();
sendSnapshotChunk(followerActor, followerLogInfo, nextSnapshotChunk, nextChunkIndex, serverConfig);
- log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+ log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName, followerActor.path(),
installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
} catch (IOException e) {
- log.warn("{}: Unable to send chunk: {}/{}. Reseting snapshot progress. Snapshot state: {}", logName(),
+ log.warn("{}: Unable to send chunk: {}/{}. Reseting snapshot progress. Snapshot state: {}", logName,
installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks(),
installSnapshotState, e);
installSnapshotState.reset();
if (!snapshotHolder.isPresent()) {
// Seems like we should never hit this case, but just in case we do, reset the snapshot progress so that it
// can restart from the next AppendEntries.
- log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName());
+ log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName);
followerLogInfo.clearLeaderInstallSnapshotState();
return false;
}
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
- log.trace("{}: Sending heartbeat", logName());
+ log.trace("{}: Sending heartbeat", logName);
sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toNanos(), true);
appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
.map(PeerInfo::getId)
.collect(ImmutableList.toImmutableList());
- log.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
+ log.debug("{}: Election: Candidate has following voting peers: {}", logName, votingPeers);
votesRequired = getMajorityVoteCount(votingPeers.size());
}
@Override
- protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
-
- log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+ RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
+ log.debug("{}: handleAppendEntries: {}", logName, appendEntries);
// Some other candidate for the same term became a leader and sent us an append entry
if (currentTerm() == appendEntries.getTerm()) {
log.info("{}: New Leader {} sent an AppendEntries to Candidate for term {} - will switch to Follower",
- logName(), appendEntries.getLeaderId(), currentTerm());
+ logName, appendEntries.getLeaderId(), currentTerm());
return switchBehavior(new Follower(context));
}
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
- final AppendEntriesReply appendEntriesReply) {
+ RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) {
return this;
}
@Override
- protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
- log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
+ RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
+ log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName, requestVoteReply, voteCount);
if (requestVoteReply.isVoteGranted()) {
voteCount++;
if (voteCount >= votesRequired) {
if (context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
log.info("{}: LastApplied index {} is behind last index {} - switching to PreLeader",
- logName(), context.getLastApplied(), context.getReplicatedLog().lastIndex());
+ logName, context.getLastApplied(), context.getReplicatedLog().lastIndex());
return internalSwitchBehavior(RaftState.PreLeader);
} else {
return internalSwitchBehavior(RaftState.Leader);
}
@Override
- protected FiniteDuration electionDuration() {
+ FiniteDuration electionDuration() {
return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor());
}
@Override
public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
if (message instanceof ElectionTimeout) {
- log.debug("{}: Received ElectionTimeout", logName());
+ log.debug("{}: Received ElectionTimeout", logName);
if (votesRequired == 0) {
// If there are no peers then we should be a Leader
if (message instanceof RaftRPC rpc) {
- log.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
+ log.debug("{}: RaftRPC message received {}, my term is {}", logName, rpc,
context.getTermInformation().getCurrentTerm());
// If RPC request or response contains term T > currentTerm:
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
log.info("{}: Term {} in \"{}\" message is greater than Candidate's term {} - switching to Follower",
- logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+ logName, rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
long newTerm = currentTerm + 1;
context.getTermInformation().updateAndPersist(newTerm, context.getId());
- log.info("{}: Starting new election term {}", logName(), newTerm);
+ log.info("{}: Starting new election term {}", logName, newTerm);
// Request for a vote
// TODO: Retry request for vote if replies do not arrive in a reasonable
context.getReplicatedLog().lastIndex(),
context.getReplicatedLog().lastTerm());
- log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
+ log.debug("{}: Sending {} to peer {}", logName, requestVote, peerId);
peerActor.tell(requestVote, context.getActor());
}
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
.getSyncIndexThreshold());
- appendEntriesMessageAssembler = MessageAssembler.builder().logContext(logName())
- .fileBackedStreamFactory(context.getFileBackedOutputStreamFactory())
- .assembledMessageCallback((message, sender) -> handleMessage(sender, message)).build();
+ appendEntriesMessageAssembler = MessageAssembler.builder()
+ .logContext(logName)
+ .fileBackedStreamFactory(context.getFileBackedOutputStreamFactory())
+ .assembledMessageCallback((message, sender) -> handleMessage(sender, message))
+ .build();
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
actor().tell(TimeoutNow.INSTANCE, actor());
}
@Override
- protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
+ RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
int numLogEntries = appendEntries.getEntries().size();
if (log.isTraceEnabled()) {
- log.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
+ log.trace("{}: handleAppendEntries: {}", logName, appendEntries);
} else if (log.isDebugEnabled() && numLogEntries > 0) {
- log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+ log.debug("{}: handleAppendEntries: {}", logName, appendEntries);
}
if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the "
- + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
+ + "AppendEntries leaderId {}", logName, snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
closeSnapshotTracker();
}
lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
appendEntries.getLeaderRaftVersion());
- log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
+ log.debug("{}: snapshot install is in progress, replying immediately with {}", logName, reply);
sender.tell(reply, actor());
return this;
}
if (prevCommitIndex != context.getCommitIndex()) {
- log.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
+ log.debug("{}: Commit index set to {}", logName, context.getCommitIndex());
}
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
appendEntries.getLeaderRaftVersion());
if (log.isTraceEnabled()) {
- log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
+ log.trace("{}: handleAppendEntries returning : {}", logName, reply);
} else if (log.isDebugEnabled() && numLogEntries > 0) {
- log.debug("{}: handleAppendEntries returning : {}", logName(), reply);
+ log.debug("{}: handleAppendEntries returning : {}", logName, reply);
}
// Reply to the leader before applying any previous state so as not to hold up leader consensus.
if (appendEntries.getLeaderCommit() > context.getLastApplied() && context.getLastApplied() < lastIndex) {
if (log.isDebugEnabled()) {
log.debug("{}: applyLogToStateMachine, appendEntries.getLeaderCommit(): {}, "
- + "context.getLastApplied(): {}, lastIndex(): {}", logName(),
+ + "context.getLastApplied(): {}, lastIndex(): {}", logName,
appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex);
}
return true;
}
- log.debug("{}: Number of entries to be appended = {}", logName(), numLogEntries);
+ log.debug("{}: Number of entries to be appended = {}", logName, numLogEntries);
long lastIndex = lastIndex();
int addEntriesFrom = 0;
long existingEntryTerm = getLogEntryTerm(matchEntry.index());
- log.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry,
+ log.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName, matchEntry,
existingEntryTerm);
// existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know
if (!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
log.info("{}: Removing entries from log starting at {}, commitIndex: {}, lastApplied: {}",
- logName(), matchEntry.index(), context.getCommitIndex(), context.getLastApplied());
+ logName, matchEntry.index(), context.getCommitIndex(), context.getLastApplied());
// Entries do not match so remove all subsequent entries but only if the existing entries haven't
// been applied to the state yet.
// so we must send back a reply to force a snapshot to completely re-sync the
// follower's log and state.
- log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
+ log.info("{}: Could not remove entries - sending reply to force snapshot", logName);
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
appendEntries.getLeaderRaftVersion()), actor());
}
lastIndex = lastIndex();
- log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), lastIndex,
+ log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName, lastIndex,
addEntriesFrom);
// When persistence successfully completes for each new log entry appended, we need to determine if we
for (int i = addEntriesFrom; i < numLogEntries; i++) {
ReplicatedLogEntry entry = entries.get(i);
- log.debug("{}: Append entry to log {}", logName(), entry.getData());
+ log.debug("{}: Append entry to log {}", logName, entry.getData());
replLog.appendAndPersist(entry, appendAndPersistCallback, false);
}
}
- log.debug("{}: Log size is now {}", logName(), replLog.size());
+ log.debug("{}: Log size is now {}", logName, replLog.size());
return true;
}
// The follower's log is out of sync because the leader does have an entry at prevLogIndex and this
// follower has no entries in it's log.
- log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(),
+ log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName,
appendEntries.getPrevLogIndex());
sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
// in the follower's log or snapshot but it has a different term.
final var replLog = context.getReplicatedLog();
log.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append "
- + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+ + "entries prevLogTerm {} - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName,
appendEntries.getPrevLogIndex(), leadersPrevLogTermInFollowersLogOrSnapshot,
appendEntries.getPrevLogTerm(), lastIndex, replLog.getSnapshotIndex(),
replLog.getSnapshotTerm());
// The follower's log is out of sync because the Leader's prevLogIndex entry was not found in it's log
final var replLog = context.getReplicatedLog();
log.info("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, "
- + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
+ + "snapshotIndex: {}, snapshotTerm: {}", logName, appendEntries.getPrevLogIndex(), lastIndex,
replLog.getSnapshotIndex(), replLog.getSnapshotTerm());
sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
// the previous entry in it's in-memory journal
final var replLog = context.getReplicatedLog();
log.info("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the "
- + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+ + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName,
appendEntries.getReplicatedToAllIndex(), lastIndex, replLog.getSnapshotIndex(),
replLog.getSnapshotTerm());
if (entries.size() > 0 && !isLogEntryPresent(entries.get(0).index() - 1)) {
final var replLog = context.getReplicatedLog();
log.info("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
- + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName(),
+ + "in-memory journal - lastIndex: {}, snapshotIndex: {}, snapshotTerm: {}", logName,
entries.get(0).index() - 1, lastIndex, replLog.getSnapshotIndex(), replLog.getSnapshotTerm());
sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(),
leaderRaftVersion);
- log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
+ log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName, reply);
sender.tell(reply, actor());
}
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
- final AppendEntriesReply appendEntriesReply) {
+ RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) {
return this;
}
@Override
- protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender,
- final RequestVoteReply requestVoteReply) {
+ RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
return this;
}
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
- logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+ logName, rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
}
if (canStartElection()) {
if (message instanceof TimeoutNow) {
- log.debug("{}: Received TimeoutNow - switching to Candidate", logName());
+ log.debug("{}: Received TimeoutNow - switching to Candidate", logName);
return internalSwitchBehavior(RaftState.Candidate);
} else if (noLeaderMessageReceived) {
// Check the cluster state to see if the leader is known to be up before we go to Candidate.
// to Candidate,
long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR;
if (isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
- log.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
+ log.debug("{}: Received ElectionTimeout but leader appears to be available", logName);
scheduleElection(electionDuration());
} else if (isThisFollowerIsolated()) {
- log.debug("{}: this follower is isolated. Do not switch to Candidate for now.", logName());
+ log.debug("{}: this follower is isolated. Do not switch to Candidate for now.", logName);
setLeaderId(null);
scheduleElection(electionDuration());
} else {
- log.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+ log.debug("{}: Received ElectionTimeout - switching to Candidate", logName);
return internalSwitchBehavior(RaftState.Candidate);
}
} else {
log.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
- logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval());
+ logName, lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval());
scheduleElection(electionDuration());
}
} else if (message instanceof ElectionTimeout) {
CurrentClusterState state = cluster.orElseThrow().state();
Set<Member> unreachable = state.getUnreachable();
- log.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
+ log.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName, leaderAddress,
unreachable);
for (Member m: unreachable) {
if (leaderAddress.equals(m.address())) {
- log.info("{}: Leader {} is unreachable", logName(), leaderAddress);
+ log.info("{}: Leader {} is unreachable", logName, leaderAddress);
return false;
}
}
for (Member m: state.getMembers()) {
if (leaderAddress.equals(m.address())) {
if (m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
- log.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
+ log.debug("{}: Leader {} cluster status is {} - leader is available", logName,
leaderAddress, m.status());
return true;
} else {
- log.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
+ log.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName,
leaderAddress, m.status());
return false;
}
}
}
- log.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
+ log.debug("{}: Leader {} not found in the cluster member set", logName, leaderAddress);
return false;
}
final Iterable<Member> members = state.getMembers();
log.debug("{}: Checking if this node is isolated in the cluster unreachable set {},"
- + "all members {} self member: {}", logName(), unreachable, members, selfMember);
+ + "all members {} self member: {}", logName, unreachable, members, selfMember);
// no unreachable peers means we cannot be isolated
if (unreachable.isEmpty()) {
}
private void handleInstallSnapshot(final ActorRef sender, final InstallSnapshot installSnapshot) {
- log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
+ log.debug("{}: handleInstallSnapshot: {}", logName, installSnapshot);
// update leader
leaderId = installSnapshot.getLeaderId();
isLastChunk = snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
installSnapshot.getLastChunkHashCode());
} catch (IOException e) {
- log.debug("{}: failed to add InstallSnapshot chunk", logName(), e);
+ log.debug("{}: failed to add InstallSnapshot chunk", logName, e);
closeSnapshotTracker();
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
return;
final var successReply = new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), true);
if (!isLastChunk) {
- log.debug("{}: handleInstallSnapshot returning: {}", logName(), successReply);
+ log.debug("{}: handleInstallSnapshot returning: {}", logName, successReply);
sender.tell(successReply, actor());
return;
}
// TODO: this message is confusing: the snapshot is *received*, not installed yet
- log.info("{}: Snapshot installed from leader: {}", logName(), leaderId);
+ log.info("{}: Snapshot installed from leader: {}", logName, leaderId);
final Snapshot.State snapshotState;
try {
snapshotState = context.getSnapshotManager().convertSnapshot(snapshotTracker.getSnapshotBytes());
} catch (IOException e) {
- log.debug("{}: failed to convert InstallSnapshot to state", logName(), e);
+ log.debug("{}: failed to convert InstallSnapshot to state", logName, e);
closeSnapshotTracker();
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
return;
}
- log.debug("{}: Converted InstallSnapshot from leader: {} to state{}", logName(), leaderId,
+ log.debug("{}: Converted InstallSnapshot from leader: {} to state{}", logName, leaderId,
snapshotState.needsMigration() ? " (needs migration)" : "");
final var snapshot = Snapshot.create(snapshotState, List.of(),
final var applySnapshotCallback = new ApplySnapshot.Callback() {
@Override
public void onSuccess() {
- log.debug("{}: handleInstallSnapshot returning: {}", logName(), successReply);
+ log.debug("{}: handleInstallSnapshot returning: {}", logName, successReply);
sender.tell(successReply, actor());
}
// we received an Append Entries reply, we should switch the Behavior to Leader
@Override
- protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
- final AppendEntriesReply appendEntriesReply) {
+ RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) {
RaftActorBehavior ret = super.handleAppendEntriesReply(sender, appendEntriesReply);
// it can happen that this isolated leader interacts with a new leader in the cluster and
if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
- log.debug("{}: Leadership transfer expired", logName());
+ log.debug("{}: Leadership transfer expired", logName);
leadershipTransferContext = null;
}
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
- final AppendEntriesReply appendEntriesReply) {
+ RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) {
RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
return returnBehavior;
* @param leadershipTransferCohort the cohort participating in the leadership transfer
*/
public void transferLeadership(@NonNull final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
- log.debug("{}: Attempting to transfer leadership", logName());
+ log.debug("{}: Attempting to transfer leadership", logName);
leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
boolean isVoting = context.getPeerInfo(followerId).isVoting();
log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
- logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
+ logName, followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
- log.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
+ log.debug("{}: Follower's log matches - sending ElectionTimeout", logName);
// We can't be sure if the follower has applied all its log entries to its state so send an
// additional AppendEntries with the latest commit index.
ActorSelection followerActor = context.getPeerActorSelection(followerId);
followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
- log.debug("{}: Leader transfer complete", logName());
+ log.debug("{}: Leader transfer complete", logName);
leadershipTransferContext.transferCohort.transferComplete();
leadershipTransferContext = null;
@Override
public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
if (message instanceof ApplyState) {
- log.debug("{}: Received {} - lastApplied: {}, lastIndex: {}", logName(), message, context.getLastApplied(),
+ log.debug("{}: Received {} - lastApplied: {}, lastIndex: {}", logName, message, context.getLastApplied(),
context.getReplicatedLog().lastIndex());
if (context.getLastApplied() >= context.getReplicatedLog().lastIndex()) {
// We've applied all entries - we can switch to Leader.
/**
* Information about the RaftActor whose behavior this class represents.
*/
- protected final @NonNull RaftActorContext context;
+ final @NonNull RaftActorContext context;
/**
* The RaftState corresponding to his behavior.
* Used for message logging.
*/
@SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
- protected final Logger log;
+ final @NonNull Logger log;
/**
* Prepended to log messages to provide appropriate context.
*/
- private final String logName;
+ final @NonNull String logName;
/**
* Used to cancel a scheduled election.
this.context = requireNonNull(context);
this.state = requireNonNull(state);
log = context.getLogger();
- logName = "%s (%s)".formatted(context.getId(), state);
+ logName = context.getId() + " (" + state + ")";
}
public static RaftActorBehavior createBehavior(final RaftActorContext context, final RaftState state) {
*/
public abstract short getLeaderPayloadVersion();
- protected final String logName() {
- return logName;
- }
-
/**
* Sets the index of the last log entry that has been replicated to all peers.
*
return replicatedToAllIndex;
}
+ final String getId() {
+ return context.getId();
+ }
+
/**
- * Derived classes should not directly handle AppendEntries messages it
- * should let the base class handle it first. Once the base class handles
- * the AppendEntries message and does the common actions that are applicable
- * in all RaftState's it will delegate the handling of the AppendEntries
- * message to the derived class to do more state specific handling by calling
- * this method
+ * Derived classes should not directly handle AppendEntries messages it should let the base class handle it first.
+ * Once the base class handles the AppendEntries message and does the common actions that are applicable in all
+ * RaftState's it will delegate the handling of the AppendEntries message to the derived class to do more state
+ * specific handling by calling this method.
*
* @param sender The actor that sent this message
* @param appendEntries The AppendEntries message
* @return a new behavior if it was changed or the current behavior
*/
- protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries);
+ abstract RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries);
/**
* Handles the common logic for the AppendEntries message and delegates handling to the derived class.
* @param appendEntries the message
* @return a new behavior if it was changed or the current behavior
*/
- protected RaftActorBehavior appendEntries(final ActorRef sender, final AppendEntries appendEntries) {
-
+ final RaftActorBehavior appendEntries(final ActorRef sender, final AppendEntries appendEntries) {
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
- log.info("{}: Cannot append entries because sender's term {} is less than {}", logName(),
+ log.info("{}: Cannot append entries because sender's term {} is less than {}", logName,
appendEntries.getTerm(), currentTerm());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
return this;
}
-
return handleAppendEntries(sender, appendEntries);
}
/**
- * Derived classes should not directly handle AppendEntriesReply messages it
- * should let the base class handle it first. Once the base class handles
- * the AppendEntriesReply message and does the common actions that are
- * applicable in all RaftState's it will delegate the handling of the
- * AppendEntriesReply message to the derived class to do more state specific
- * handling by calling this method
+ * Derived classes should not directly handle AppendEntriesReply messages it should let the base class handle it
+ * first. Once the base class handles the AppendEntriesReply message and does the common actions that are applicable
+ * in all RaftState's it will delegate the handling of the AppendEntriesReply message to the derived class to do
+ * more state specific handling by calling this method
*
* @param sender The actor that sent this message
* @param appendEntriesReply The AppendEntriesReply message
* @return a new behavior if it was changed or the current behavior
*/
- protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply);
+ abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply);
/**
* Handles the logic for the RequestVote message that is common for all behaviors.
* @param requestVote the message
* @return a new behavior if it was changed or the current behavior
*/
- protected RaftActorBehavior requestVote(final ActorRef sender, final RequestVote requestVote) {
-
- log.debug("{}: In requestVote: {} - currentTerm: {}, votedFor: {}, lastIndex: {}, lastTerm: {}", logName(),
+ final RaftActorBehavior requestVote(final ActorRef sender, final RequestVote requestVote) {
+ log.debug("{}: In requestVote: {} - currentTerm: {}, votedFor: {}, lastIndex: {}, lastTerm: {}", logName,
requestVote, currentTerm(), votedFor(), lastIndex(), lastTerm());
- boolean grantVote = canGrantVote(requestVote);
-
+ final var grantVote = canGrantVote(requestVote);
if (grantVote) {
context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId());
}
- RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
-
- log.debug("{}: requestVote returning: {}", logName(), reply);
-
+ final var reply = new RequestVoteReply(currentTerm(), grantVote);
+ log.debug("{}: requestVote returning: {}", logName, reply);
sender.tell(reply, actor());
-
return this;
}
- protected boolean canGrantVote(final RequestVote requestVote) {
+ final boolean canGrantVote(final RequestVote requestVote) {
boolean grantVote = false;
// Reply false if term < currentTerm (§5.1)
* @param requestVoteReply The RequestVoteReply message
* @return a new behavior if it was changed or the current behavior
*/
- protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply);
+ abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply);
/**
* Returns a duration for election with an additional variance for randomness.
*
* @return a random election duration
*/
- protected FiniteDuration electionDuration() {
+ FiniteDuration electionDuration() {
long variance = ThreadLocalRandom.current().nextInt(context.getConfigParams().getElectionTimeVariance());
- return context.getConfigParams().getElectionTimeOutInterval().$plus(
- new FiniteDuration(variance, TimeUnit.MILLISECONDS));
+ return context.getConfigParams().getElectionTimeOutInterval()
+ .$plus(new FiniteDuration(variance, TimeUnit.MILLISECONDS));
}
/**
* Stops the currently scheduled election.
*/
- protected void stopElection() {
+ final void stopElection() {
if (electionCancel != null && !electionCancel.isCancelled()) {
electionCancel.cancel();
}
}
- protected boolean canStartElection() {
+ final boolean canStartElection() {
return context.getRaftPolicy().automaticElectionsEnabled() && context.isVotingMember();
}
* @param interval the duration after which we should trigger a new election
*/
// Non-final for testing
- protected void scheduleElection(final FiniteDuration interval) {
+ final void scheduleElection(final FiniteDuration interval) {
stopElection();
// Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself
*
* @return the current term
*/
- protected long currentTerm() {
+ final long currentTerm() {
return context.getTermInformation().getCurrentTerm();
}
*
* @return the candidate for whom we voted in the current term
*/
- protected String votedFor() {
+ final String votedFor() {
return context.getTermInformation().getVotedFor();
}
*
* @return the actor
*/
- protected final ActorRef actor() {
+ final ActorRef actor() {
return context.getActor();
}
*
* @return the term
*/
- protected long lastTerm() {
+ final long lastTerm() {
return context.getReplicatedLog().lastTerm();
}
*
* @return the index
*/
- protected long lastIndex() {
+ final long lastIndex() {
return context.getReplicatedLog().lastIndex();
}
*
* @return the log entry index or -1 if not found
*/
- protected long getLogEntryIndex(final long index) {
+ final long getLogEntryIndex(final long index) {
final var replLog = context.getReplicatedLog();
if (index == replLog.getSnapshotIndex()) {
return index;
*
* @return the log entry term or -1 if not found
*/
- protected long getLogEntryTerm(final long index) {
+ final long getLogEntryTerm(final long index) {
final var replLog = context.getReplicatedLog();
if (index == replLog.getSnapshotIndex()) {
return replLog.getSnapshotTerm();
*
* @return the term or -1 otherwise
*/
- protected long getLogEntryOrSnapshotTerm(final long index) {
+ final long getLogEntryOrSnapshotTerm(final long index) {
final var replLog = context.getReplicatedLog();
return replLog.isInSnapshot(index) ? replLog.getSnapshotTerm() : getLogEntryTerm(index);
}
*
* @param index the log index
*/
- protected void applyLogToStateMachine(final long index) {
+ final void applyLogToStateMachine(final long index) {
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
final ApplyState applyState = getApplyStateFor(replicatedLogEntry);
- log.debug("{}: Setting last applied to {}", logName(), i);
+ log.debug("{}: Setting last applied to {}", logName, i);
context.setLastApplied(i);
context.getApplyStateConsumer().accept(applyState);
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
- log.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
- logName(), i, i, index);
+ log.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", logName, i, i, index);
break;
}
}
@Override
public abstract void close();
- protected RaftActorBehavior internalSwitchBehavior(final RaftState newState) {
+ final RaftActorBehavior internalSwitchBehavior(final RaftState newState) {
return internalSwitchBehavior(createBehavior(context, newState));
}
@SuppressWarnings("checkstyle:IllegalCatch")
- protected RaftActorBehavior internalSwitchBehavior(final RaftActorBehavior newBehavior) {
+ final RaftActorBehavior internalSwitchBehavior(final RaftActorBehavior newBehavior) {
if (!context.getRaftPolicy().automaticElectionsEnabled()) {
return this;
}
- log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), state(),
+ log.info("{} :- Switching from behavior {} to {}, election term: {}", logName, state(),
newBehavior.state(), context.getTermInformation().getCurrentTerm());
try {
close();
} catch (RuntimeException e) {
- log.error("{}: Failed to close behavior : {}", logName(), state(), e);
+ log.error("{}: Failed to close behavior : {}", logName, state(), e);
}
return newBehavior;
}
-
- protected int getMajorityVoteCount(final int numPeers) {
+ static final int getMajorityVoteCount(final int numPeers) {
// Votes are required from a majority of the peers including self.
// The numMajority field therefore stores a calculated value
// of the number of votes required for this candidate to win an
numMajority = (numPeers + self) / 2 + 1;
}
return numMajority;
-
}
-
/**
* Performs a snapshot with no capture on the replicated log. It clears the log from the supplied index or
* lastApplied-1 which ever is minimum.
*
* @param snapshotCapturedIndex the index from which to clear
*/
- protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
+ final void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex);
if (actualIndex != -1) {
}
}
- protected final String getId() {
- return context.getId();
- }
-
// Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote
// messages, as the candidate is not able to receive our response.
- protected boolean shouldUpdateTerm(final RaftRPC rpc) {
+ final boolean shouldUpdateTerm(final RaftRPC rpc) {
if (!(rpc instanceof RequestVote requestVote)) {
return true;
}
- log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName());
+ log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName);
final Optional<Cluster> maybeCluster = context.getCluster();
if (!maybeCluster.isPresent()) {
return true;
final Cluster cluster = maybeCluster.orElseThrow();
final Set<Member> unreachable = cluster.state().getUnreachable();
- log.debug("{}: Cluster state: {}", logName(), unreachable);
+ log.debug("{}: Cluster state: {}", logName, unreachable);
for (Member member : unreachable) {
for (String role : member.getRoles()) {
if (requestVote.getCandidateId().startsWith(role)) {
- log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(),
+ log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName,
member, requestVote);
return false;
}
}
}
- log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(),
+ log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName,
requestVote);
return true;
}
List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
- actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
+ actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName);
fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
- actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
+ actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName);
fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while (!fts.isLastChunk(fts.getChunkIndex())) {