private Optional<ByteString> snapshot;
- private long replicatedToAllIndex = -1;
-
public AbstractLeader(RaftActorContext context) {
- super(context);
+ super(context, RaftState.Leader);
final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId,
- context.getCommitIndex(), -1,
- context.getConfigParams().getElectionTimeOutInterval());
+ new FollowerLogInformationImpl(followerId, -1, context);
ftlBuilder.put(followerId, followerLogInformation);
}
leaderId = context.getId();
- LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
+ LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (§5.2)
- sendAppendEntries(0);
+ sendAppendEntries(0, false);
+
+ // It is important to schedule this heartbeat here
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
/**
return followerToLog.keySet();
}
- private Optional<ByteString> getSnapshot() {
- return snapshot;
- }
-
@VisibleForTesting
void setSnapshot(Optional<ByteString> snapshot) {
this.snapshot = snapshot;
protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
- }
+ LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
return this;
}
protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
- if(! appendEntriesReply.isSuccess()) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
- }
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
+ } else if(LOG.isDebugEnabled() && !appendEntriesReply.isSuccess()) {
+ LOG.debug("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
}
// Update the FollowerLogInformation
followerToLog.get(followerId);
if(followerLogInformation == null){
- LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
+ LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
return this;
}
+ if(followerLogInformation.timeSinceLastActivity() >
+ context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
+ LOG.error("{} : handleAppendEntriesReply delayed beyond election timeout, " +
+ "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
+ logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
+ context.getLastApplied(), context.getCommitIndex());
+ }
+
followerLogInformation.markFollowerActive();
if (appendEntriesReply.isSuccess()) {
// Apply the change to the state machine
if (context.getCommitIndex() > context.getLastApplied()) {
+ LOG.debug("{}: handleAppendEntriesReply: applying to log - commitIndex: {}, lastAppliedIndex: {}",
+ logName(), context.getCommitIndex(), context.getLastApplied());
+
applyLogToStateMachine(context.getCommitIndex());
}
}
//Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
- sendUpdatesToFollower(followerId, followerLogInformation, false);
+ sendUpdatesToFollower(followerId, followerLogInformation, false, false);
return this;
}
private void purgeInMemoryLog() {
- //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ //find the lowest index across followers which has been replicated to all.
+ // lastApplied if there are no followers, so that we keep clearing the log for single-node
// we would delete the in-mem log from that index on, in-order to minimize mem usage
// we would also share this info thru AE with the followers so that they can delete their log entries as well.
- long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
for (FollowerLogInformation info : followerToLog.values()) {
minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
}
- replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
}
@Override
return this;
}
- @Override
- public RaftState state() {
- return RaftState.Leader;
- }
+ protected void beforeSendHeartbeat(){}
@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
- rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+ LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
}
}
- try {
- if (message instanceof SendHeartBeat) {
- sendHeartBeat();
- return this;
+ if (message instanceof SendHeartBeat) {
+ beforeSendHeartbeat();
+ sendHeartBeat();
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ return this;
- } else if(message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
- sendInstallSnapshot();
+ } else if(message instanceof SendInstallSnapshot) {
+ // received from RaftActor
+ setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ sendInstallSnapshot();
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
+ } else if (message instanceof Replicate) {
+ replicate((Replicate) message);
- } else if (message instanceof InstallSnapshotReply){
- handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof InstallSnapshotReply){
+ handleInstallSnapshotReply((InstallSnapshotReply) message);
- }
- } finally {
- scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
+
return super.handleMessage(sender, message);
}
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
+ LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
+
String followerId = reply.getFollowerId();
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
- context.getId(), followerId);
+ logName(), followerId);
return;
}
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshotReply received, " +
- "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- context.getId(), reply.getChunkIndex(), followerId,
+ "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}",
+ logName(), reply.getChunkIndex(), followerId,
context.getReplicatedLog().getSnapshotIndex() + 1
);
}
context.getReplicatedLog().getSnapshotIndex() + 1);
mapFollowerToSnapshot.remove(followerId);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
- context.getId(), followerToLog.get(followerId).getNextIndex());
- }
+ LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
+ logName(), followerId, followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
if (mapFollowerToSnapshot.isEmpty()) {
// once there are no pending followers receiving snapshots
}
} else {
LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
- context.getId(), reply.getChunkIndex());
+ logName(), reply.getChunkIndex());
followerToSnapshot.markSendStatus(false);
}
} else {
LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
- context.getId(), reply.getChunkIndex(), followerId,
+ logName(), reply.getChunkIndex(), followerId,
followerToSnapshot.getChunkIndex());
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
- }
+ LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
+ replicate.getIdentifier(), logIndex);
// Create a tracker entry we will use this later to notify the
// client actor
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
} else {
- sendAppendEntries(0);
+ sendAppendEntries(0, false);
}
}
- private void sendAppendEntries(long timeSinceLastActivityInterval) {
+ private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
// Send an AppendEntries to all followers
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
// This checks helps not to send a repeat message to the follower
if(!followerLogInformation.isFollowerActive() ||
followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
- sendUpdatesToFollower(followerId, followerLogInformation, true);
+ sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
}
}
}
*/
private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
- boolean sendHeartbeat) {
+ boolean sendHeartbeat, boolean isHeartbeat) {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- if (isFollowerActive &&
- context.getReplicatedLog().isPresent(followerNextIndex)) {
+
+ if(!isHeartbeat || LOG.isTraceEnabled()) {
+ LOG.debug("{}: Checking sendAppendEntries for follower {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+ logName(), followerId, leaderLastIndex, leaderSnapShotIndex);
+ }
+
+ if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+
+ LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
+ followerNextIndex, followerId);
+
// FIXME : Sending one entry at a time
final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex) {
+ leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
if (LOG.isDebugEnabled()) {
- LOG.debug("InitiateInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
+ LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+ "follower-nextIndex: %d, leader-snapshot-index: %d, " +
+ "leader-last-index: %d", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
}
// Send heartbeat to follower whenever install snapshot is initiated.
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
prevLogIndex(followerNextIndex),
prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex(), replicatedToAllIndex);
+ context.getCommitIndex(), super.getReplicatedToAllIndex());
- if(!entries.isEmpty()) {
- LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+ if(!entries.isEmpty() || LOG.isTraceEnabled()) {
+ LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
appendEntries);
}
}
/**
- * /**
* Install Snapshot works as follows
* 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
* 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
* @param followerNextIndex
*/
private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
- }
-
if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
} else if (!context.isSnapshotCaptureInitiated()) {
- LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
+ LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", logName(), getLeaderId());
ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
if (lastAppliedEntry != null) {
lastAppliedIndex = lastAppliedEntry.getIndex();
lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
}
boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
+ long replicatedToAllIndex = super.getReplicatedToAllIndex();
+ ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+ isInstallSnapshotInitiated), actor());
context.setSnapshotCaptureInitiated(true);
}
}
private void sendInstallSnapshot() {
+ LOG.debug("{}: sendInstallSnapshot", logName());
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
nextSnapshotChunk,
- followerToSnapshot.incrementChunkIndex(),
- followerToSnapshot.getTotalChunks(),
+ followerToSnapshot.incrementChunkIndex(),
+ followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()
);
LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- context.getId(), followerActor.path(),
+ logName(), followerActor.path(),
followerToSnapshot.getChunkIndex(),
followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
- LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e);
+ LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
}
}
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
- }
+
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+
return nextChunk;
}
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
- sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis());
+ LOG.trace("{}: Sending heartbeat", logName());
+ sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
}
}
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
- context.getId(), size, totalChunks);
+ logName(), size, totalChunks);
}
replyReceivedForOffset = -1;
chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
+
+ LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
snapshotLength, start, size);
- }
+
ByteString substring = getSnapshotBytes().substring(start, start + size);
nextChunkHashCode = substring.hashCode();
return substring;