public void removeFollower(String followerId) {
followerToLog.remove(followerId);
+ mapFollowerToSnapshot.remove(followerId);
}
public void updateMinReplicaCount() {
// If there exists an N such that N > commitIndex, a majority
// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
// set commitIndex = N (§5.3, §5.4).
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+ logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
+ }
+
for (long N = context.getCommitIndex() + 1; ; N++) {
int replicatedCount = 1;
+ LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
- if ((info.getMatchIndex() >= N) && (context.getPeerInfo(followerId).isVoting())) {
+ final PeerInfo peerInfo = context.getPeerInfo(info.getId());
+ if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
replicatedCount++;
+ } else if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+ info.getMatchIndex(), peerInfo);
}
}
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount);
+ }
+
if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
+ if (replicatedLogEntry == null) {
+ LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size());
+ break;
+ }
+
+ // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+ // "Raft never commits log entries from previous terms by counting replicas".
+ // However we keep looping so we can make progress when new entries in the current term
+ // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+ // counting replicas, then all prior entries are committed indirectly".
+ if (replicatedLogEntry.getTerm() == currentTerm()) {
+ LOG.trace("{}: Setting commit index to {}", logName(), N);
context.setCommitIndex(N);
} else {
- break;
+ LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}",
+ logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
}
} else {
+ LOG.trace("{}: minReplicationCount not reached - breaking", logName());
break;
}
}