// The index that the follower should respond with if it needs the install snapshot to be reset
static final int INVALID_CHUNK_INDEX = -1;
+ static final int INITIAL_OFFSET = -1;
+
// This would be passed as the hash code of the last chunk when sending the first chunk
static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
private final int snapshotChunkSize;
private final String logName;
private ByteSource snapshotBytes;
- private int offset = 0;
+ private int offset = INITIAL_OFFSET;
// the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
private int replyReceivedForOffset = -1;
// if replyStatus is false, the previous chunk is attempted
LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
- replyReceivedForOffset = -1;
+ replyReceivedForOffset = INITIAL_OFFSET;
chunkIndex = FIRST_CHUNK_INDEX;
}
int incrementOffset() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
+ // if offset is -1 doesnt matter whether it was the initial value or reset, move the offset to 0 to begin with
+ if (offset == INITIAL_OFFSET) {
+ offset = 0;
+ } else {
offset = offset + snapshotChunkSize;
}
return offset;
int incrementChunkIndex() {
if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
+ // if prev chunk failed, we would want to send the same chunk again
chunkIndex = chunkIndex + 1;
}
return chunkIndex;
replyStatus = true;
lastChunkHashCode = nextChunkHashCode;
} else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
+ // if the chunk sent was failure, revert offset to previous so we can retry
+ offset = replyReceivedForOffset;
replyStatus = false;
}
}
byte[] getNextChunk() throws IOException {
+ // increment offset to indicate next chunk is in flight, canSendNextChunk() wont let us hit this again until,
+ // markSendStatus() is called with either success or failure
+ int start = incrementOffset();
if (replyStatus || currentChunk == null) {
- int start = incrementOffset();
int size = snapshotChunkSize;
if (snapshotChunkSize > snapshotSize) {
size = (int) snapshotSize;
closeStream();
chunkTimer.reset();
- offset = 0;
+ offset = INITIAL_OFFSET;
replyStatus = false;
- replyReceivedForOffset = offset;
+ replyReceivedForOffset = INITIAL_OFFSET;
chunkIndex = FIRST_CHUNK_INDEX;
currentChunk = null;
lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+ nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
try {
snapshotInputStream = snapshotBytes.openStream();