private Optional<ByteString> snapshot;
+ private long replicatedToAllIndex = -1;
+
public AbstractLeader(RaftActorContext context) {
super(context);
applyLogToStateMachine(context.getCommitIndex());
}
+ if (!context.isSnapshotCaptureInitiated()) {
+ purgeInMemoryLog();
+ }
+
return this;
}
+ private void purgeInMemoryLog() {
+ //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ // we would delete the in-mem log from that index on, in-order to minimize mem usage
+ // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ for (FollowerLogInformation info : followerToLog.values()) {
+ minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+ }
+
+ replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ }
+
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
final Iterator<ClientRequestTracker> it = trackerList.iterator();
// set currentTerm = T, convert to follower (ยง5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+ rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
return switchBehavior(new Follower(context));
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
String followerId = reply.getFollowerId();
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+ if (followerToSnapshot == null) {
+ LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ context.getId(), followerId);
+ return;
+ }
+
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
followerLogInformation.markFollowerActive();
- if (followerToSnapshot != null &&
- followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+ if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
followerToSnapshot.markSendStatus(false);
}
-
} else {
- LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
- );
+ LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+ context.getId(), reply.getChunkIndex(), followerId,
+ followerToSnapshot.getChunkIndex());
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
// Since the Follower did not find this index to be valid we should reset the follower snapshot
private void sendAppendEntries() {
// Send an AppendEntries to all followers
+
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
ActorSelection followerActor = context.getPeerActorSelection(followerId);
long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
- if (mapFollowerToSnapshot.get(followerId) != null) {
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ if (followerToSnapshot != null) {
// if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
sendSnapshotChunk(followerActor, followerId);
} else {
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList());
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
}
} else {
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
final List<ReplicatedLogEntry> entries;
- if (isFollowerActive &&
- context.getReplicatedLog().isPresent(followerNextIndex)) {
+ LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+ context.getId(), leaderLastIndex, leaderSnapShotIndex);
+
+ if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+ LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
+ followerNextIndex, followerId);
+
// FIXME : Sending one entry at a time
entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
entries = Collections.<ReplicatedLogEntry>emptyList();
}
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
+ sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
}
}
}
}
private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries) {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
+ List<ReplicatedLogEntry> entries, String followerId) {
+ AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex(), replicatedToAllIndex);
+
+ if(!entries.isEmpty()) {
+ LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+ appendEntries);
+ }
+
+ followerActor.tell(appendEntries.toSerializable(), actor());
}
/**
*
*/
private void installSnapshotIfNeeded() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+ }
+
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
+ ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+ // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+ // followerId to the followerToSnapshot map.
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,snapshot.get()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks(),
- Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+ nextSnapshotChunk,
+ followerToSnapshot.incrementChunkIndex(),
+ followerToSnapshot.getTotalChunks(),
+ Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()
);
LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
context.getId(), followerActor.path(),
- mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ followerToSnapshot.getChunkIndex(),
+ followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());