private Optional<ByteString> snapshot;
- private long replicatedToAllIndex = -1;
-
public AbstractLeader(RaftActorContext context) {
super(context);
}
private void purgeInMemoryLog() {
- //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ //find the lowest index across followers which has been replicated to all.
+ // lastApplied if there are no followers, so that we keep clearing the log for single-node
// we would delete the in-mem log from that index on, in-order to minimize mem usage
// we would also share this info thru AE with the followers so that they can delete their log entries as well.
- long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
for (FollowerLogInformation info : followerToLog.values()) {
minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
}
- replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
}
@Override
sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex) {
+ leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
prevLogIndex(followerNextIndex),
prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex(), replicatedToAllIndex);
+ context.getCommitIndex(), super.getReplicatedToAllIndex());
if(!entries.isEmpty()) {
LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
}
/**
- * /**
* Install Snapshot works as follows
* 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
* 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
* @param followerNextIndex
*/
private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
- }
-
if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
if (lastAppliedEntry != null) {
lastAppliedIndex = lastAppliedEntry.getIndex();
lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
}
boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
+ long replicatedToAllIndex = super.getReplicatedToAllIndex();
+ ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+ isInstallSnapshotInitiated), actor());
context.setSnapshotCaptureInitiated(true);
}
}
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
nextSnapshotChunk,
- followerToSnapshot.incrementChunkIndex(),
- followerToSnapshot.getTotalChunks(),
+ followerToSnapshot.incrementChunkIndex(),
+ followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()