final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId,
- context.getCommitIndex(), -1,
- context.getConfigParams().getElectionTimeOutInterval());
+ new FollowerLogInformationImpl(followerId, -1, context);
ftlBuilder.put(followerId, followerLogInformation);
}
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (ยง5.2)
sendAppendEntries(0, false);
+
+ // It is important to schedule this heartbeat here
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
/**
*
* @return Collection of follower IDs
*/
- protected final Collection<String> getFollowerIds() {
+ public final Collection<String> getFollowerIds() {
return followerToLog.keySet();
}
if(LOG.isTraceEnabled()) {
LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
- } else if(LOG.isDebugEnabled() && !appendEntriesReply.isSuccess()) {
- LOG.debug("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
}
// Update the FollowerLogInformation
return this;
}
+ if(followerLogInformation.timeSinceLastActivity() >
+ context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
+ LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " +
+ "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
+ logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
+ context.getLastApplied(), context.getCommitIndex());
+ }
+
followerLogInformation.markFollowerActive();
+ boolean updated = false;
if (appendEntriesReply.isSuccess()) {
- followerLogInformation
- .setMatchIndex(appendEntriesReply.getLogLastIndex());
- followerLogInformation
- .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+ updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
+ updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
+
+ if(updated && LOG.isDebugEnabled()) {
+ LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(),
+ followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
+ }
} else {
+ LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
// TODO: When we find that the follower is out of sync with the
// Leader we simply decrement that followers next index by 1.
// Apply the change to the state machine
if (context.getCommitIndex() > context.getLastApplied()) {
- LOG.debug("{}: handleAppendEntriesReply: applying to log - commitIndex: {}, lastAppliedIndex: {}",
- logName(), context.getCommitIndex(), context.getLastApplied());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
+ logName(), followerId, context.getCommitIndex(), context.getLastApplied());
+ }
applyLogToStateMachine(context.getCommitIndex());
}
}
//Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
- sendUpdatesToFollower(followerId, followerLogInformation, false, false);
+ sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
return this;
}
return this;
}
+ protected void beforeSendHeartbeat(){}
+
@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
}
}
- try {
- if (message instanceof SendHeartBeat) {
- sendHeartBeat();
- return this;
+ if (message instanceof SendHeartBeat) {
+ beforeSendHeartbeat();
+ sendHeartBeat();
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ return this;
- } else if(message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
- sendInstallSnapshot();
+ } else if(message instanceof SendInstallSnapshot) {
+ // received from RaftActor
+ setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ sendInstallSnapshot();
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
+ } else if (message instanceof Replicate) {
+ replicate((Replicate) message);
- } else if (message instanceof InstallSnapshotReply){
- handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof InstallSnapshotReply){
+ handleInstallSnapshotReply((InstallSnapshotReply) message);
- }
- } finally {
- scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
+
return super.handleMessage(sender, message);
}
followerToSnapshot.markSendStatus(false);
}
- if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+ if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+ // Since the follower is now caught up try to purge the log.
+ purgeInMemoryLog();
+ } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if(followerActor != null) {
sendSnapshotChunk(followerActor, followerId);
if (followerActor != null) {
long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ boolean sendAppendEntries = false;
+ List<ReplicatedLogEntry> entries = Collections.EMPTY_LIST;
if (mapFollowerToSnapshot.get(followerId) != null) {
// if install snapshot is in process , then sent next chunk if possible
sendSnapshotChunk(followerActor, followerId);
} else if(sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ sendAppendEntries = true;
}
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- if(!isHeartbeat || LOG.isTraceEnabled()) {
- LOG.debug("{}: Checking sendAppendEntries for follower {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
- logName(), followerId, leaderLastIndex, leaderSnapShotIndex);
+ if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
+ LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+ logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
}
if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
followerNextIndex, followerId);
// FIXME : Sending one entry at a time
- final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
-
+ if(followerLogInformation.okToReplicate()) {
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+ sendAppendEntries = true;
+ }
} else if (isFollowerActive && followerNextIndex >= 0 &&
leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
// if the followers next index is not present in the leaders log, and
}
// Send heartbeat to follower whenever install snapshot is initiated.
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
-
+ sendAppendEntries = true;
initiateCaptureSnapshot(followerId, followerNextIndex);
} else if(sendHeartbeat) {
- //we send an AppendEntries, even if the follower is inactive
+ // we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ sendAppendEntries = true;
}
}
+
+ if(sendAppendEntries) {
+ sendAppendEntriesToFollower(followerActor, followerNextIndex,
+ entries, followerId);
+ }
}
}
} else if (!context.isSnapshotCaptureInitiated()) {
- LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", logName(), getLeaderId());
ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
boolean isInstallSnapshotInitiated = true;
long replicatedToAllIndex = super.getReplicatedToAllIndex();
ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
- isInstallSnapshotInitiated), actor());
+
+ CaptureSnapshot captureSnapshot = new CaptureSnapshot(
+ lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+ isInstallSnapshotInitiated);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
+ captureSnapshot);
+ }
+
+ actor().tell(captureSnapshot, actor());
context.setSnapshotCaptureInitiated(true);
}
}
).toSerializable(),
actor()
);
- LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- logName(), followerActor.path(),
- followerToSnapshot.getChunkIndex(),
- followerToSnapshot.getTotalChunks());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ logName(), followerActor.path(), followerToSnapshot.getChunkIndex(),
+ followerToSnapshot.getTotalChunks());
+ }
}
} catch (IOException e) {
LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);