public void removeFollower(String followerId) {
followerToLog.remove(followerId);
+ mapFollowerToSnapshot.remove(followerId);
}
public void updateMinReplicaCount() {
int replicatedCount = 1;
for (FollowerLogInformation info : followerToLog.values()) {
- if ((info.getMatchIndex() >= N) && (context.getPeerInfo(followerId).isVoting())) {
+ final PeerInfo peerInfo = context.getPeerInfo(info.getId());
+ if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
replicatedCount++;
}
}
if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
- context.setCommitIndex(N);
- } else {
+ if (replicatedLogEntry == null) {
break;
}
+
+ // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+ // "Raft never commits log entries from previous terms by counting replicas".
+ // However we keep looping so we can make progress when new entries in the current term
+ // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+ // counting replicas, then all prior entries are committed indirectly".
+ if (replicatedLogEntry.getTerm() == currentTerm()) {
+ context.setCommitIndex(N);
+ }
} else {
break;
}
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
- LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
logName(), followerId);
return;
}
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ if(followerLogInformation == null) {
+ // This can happen during AddServer if it times out.
+ LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
+ mapFollowerToSnapshot.remove(followerId);
+ return;
+ }
+
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
}
}
- private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+ protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
// Send an AppendEntries to all followers
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
appendEntries);
}
- followerActor.tell(appendEntries.toSerializable(), actor());
+ followerActor.tell(appendEntries, actor());
}
/**
* then send the existing snapshot in chunks to the follower.
* @param followerId
*/
- public void initiateCaptureSnapshot(String followerId) {
+ public boolean initiateCaptureSnapshot(String followerId) {
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot.
// This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
-
-
+ return true;
} else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
this.getReplicatedToAllIndex(), followerId);
}
}