* set commitIndex = N (§5.3, §5.4).
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+ // The index of the first chunk that is sent when installing a snapshot
+ public static final int FIRST_CHUNK_INDEX = 1;
+
+ // The index that the follower should respond with if it needs the install snapshot to be reset
+ public static final int INVALID_CHUNK_INDEX = -1;
+
+ // This would be passed as the hash code of the last chunk when sending the first chunk
+ public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
"sending snapshot chunk failed, Will retry, Chunk:{}",
reply.getChunkIndex()
);
+
followerToSnapshot.markSendStatus(false);
}
" or Chunk Index in InstallSnapshotReply not matching {} != {}",
followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
);
+
+ if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+ // Since the Follower did not find this index to be valid we should reset the follower snapshot
+ // so that Installing the snapshot can resume from the beginning
+ followerToSnapshot.reset();
+ }
}
}
context.getReplicatedLog().getSnapshotTerm(),
getNextSnapshotChunk(followerId,snapshot.get()),
mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+ Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
).toSerializable(),
actor()
);
private boolean replyStatus = false;
private int chunkIndex;
private int totalChunks;
+ private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
- replyReceivedForOffset = -1;
- chunkIndex = 1;
int size = snapshotBytes.size();
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
LOG.debug("Snapshot {} bytes, total chunks to send:{}",
size, totalChunks);
}
+ replyReceivedForOffset = -1;
+ chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
public ByteString getSnapshotBytes() {
// if the chunk sent was successful
replyReceivedForOffset = offset;
replyStatus = true;
+ lastChunkHashCode = nextChunkHashCode;
} else {
// if the chunk sent was failure
replyReceivedForOffset = offset;
LOG.debug("length={}, offset={},size={}",
snapshotLength, start, size);
}
- return getSnapshotBytes().substring(start, start + size);
+ ByteString substring = getSnapshotBytes().substring(start, start + size);
+ nextChunkHashCode = substring.hashCode();
+ return substring;
+ }
+
+ /**
+ * reset should be called when the Follower needs to be sent the snapshot from the beginning
+ */
+ public void reset(){
+ offset = 0;
+ replyStatus = false;
+ replyReceivedForOffset = offset;
+ chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+ lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ }
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode;
}
}