public void removeFollower(String followerId) {
followerToLog.remove(followerId);
+ mapFollowerToSnapshot.remove(followerId);
}
public void updateMinReplicaCount() {
// If there exists an N such that N > commitIndex, a majority
// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
// set commitIndex = N (§5.3, §5.4).
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+ logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
+ }
+
for (long N = context.getCommitIndex() + 1; ; N++) {
int replicatedCount = 1;
+ LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex() >= N) {
+ final PeerInfo peerInfo = context.getPeerInfo(info.getId());
+ if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
replicatedCount++;
+ } else if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+ info.getMatchIndex(), peerInfo);
}
}
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount);
+ }
+
if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
+ if (replicatedLogEntry == null) {
+ LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size());
+ break;
+ }
+
+ // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+ // "Raft never commits log entries from previous terms by counting replicas".
+ // However we keep looping so we can make progress when new entries in the current term
+ // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+ // counting replicas, then all prior entries are committed indirectly".
+ if (replicatedLogEntry.getTerm() == currentTerm()) {
+ LOG.trace("{}: Setting commit index to {}", logName(), N);
context.setCommitIndex(N);
} else {
- break;
+ LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}",
+ logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
}
} else {
+ LOG.trace("{}: minReplicationCount not reached - breaking", logName());
break;
}
}
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
- LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
logName(), followerId);
return;
}
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ if(followerLogInformation == null) {
+ // This can happen during AddServer if it times out.
+ LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
+ mapFollowerToSnapshot.remove(followerId);
+ return;
+ }
+
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
}
}
- private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+ protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
// Send an AppendEntries to all followers
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
* then send the existing snapshot in chunks to the follower.
* @param followerId
*/
- public void initiateCaptureSnapshot(String followerId) {
+ public boolean initiateCaptureSnapshot(String followerId) {
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot.
// This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
-
-
+ return true;
} else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
this.getReplicatedToAllIndex(), followerId);
}
}