public AbstractLeader(RaftActorContext context) {
super(context, RaftState.Leader);
+ setLeaderPayloadVersion(context.getPayloadVersion());
+
final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
}
followerLogInformation.markFollowerActive();
+ followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
boolean updated = false;
if (appendEntriesReply.isSuccess()) {
- updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
- updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
-
- if(updated && LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(),
- followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
- }
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
} else {
LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
- // TODO: When we find that the follower is out of sync with the
- // Leader we simply decrement that followers next index by 1.
- // Would it be possible to do better than this? The RAFT spec
- // does not explicitly deal with it but may be something for us to
- // think about
+ long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+ ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
+ if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
+ followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
+ // The follower's log is empty or the last entry is present in the leader's journal
+ // and the terms match so the follower is just behind the leader's journal from
+ // the last snapshot, if any. We'll catch up the follower quickly by starting at the
+ // follower's last log index.
+
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+ } else {
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about.
- followerLogInformation.decrNextIndex();
+ followerLogInformation.decrNextIndex();
+ }
}
// Now figure out if this reply warrants a change in the commitIndex
return this;
}
+ private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
+ AppendEntriesReply appendEntriesReply) {
+ boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
+ updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
+
+ if(updated && LOG.isDebugEnabled()) {
+ LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
+ logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
+ }
+ return updated;
+ }
+
private void purgeInMemoryLog() {
//find the lowest index across followers which has been replicated to all.
// lastApplied if there are no followers, so that we keep clearing the log for single-node
long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
boolean sendAppendEntries = false;
- List<ReplicatedLogEntry> entries = Collections.EMPTY_LIST;
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
if (mapFollowerToSnapshot.get(followerId) != null) {
// if install snapshot is in process , then sent next chunk if possible
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
- LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
- logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+ LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+ logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
}
if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
followerNextIndex, followerId);
- // FIXME : Sending one entry at a time
if(followerLogInformation.okToReplicate()) {
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+ // Try to send all the entries in the journal but not exceeding the max data size
+ // for a single AppendEntries message.
+ int maxEntries = (int) context.getReplicatedLog().size();
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
+ context.getConfigParams().getSnapshotChunkSize());
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0 &&
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
prevLogIndex(followerNextIndex),
prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex(), super.getReplicatedToAllIndex());
+ context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
if(!entries.isEmpty() || LOG.isTraceEnabled()) {
LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,