- byte[] getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = snapshotChunkSize;
- if (snapshotChunkSize > snapshotLength) {
- size = snapshotLength;
- } else if (start + snapshotChunkSize > snapshotLength) {
- size = snapshotLength - start;
+ byte[] getNextChunk() throws IOException {
+ // increment offset to indicate next chunk is in flight, canSendNextChunk() wont let us hit this again until,
+ // markSendStatus() is called with either success or failure
+ final var start = incrementOffset();
+ if (replyStatus || currentChunk == null) {
+ int size = snapshotChunkSize;
+ if (snapshotChunkSize > snapshotSize) {
+ size = (int) snapshotSize;
+ } else if (start + snapshotChunkSize > snapshotSize) {
+ size = (int) (snapshotSize - start);
+ }
+
+ currentChunk = new byte[size];
+ final var numRead = snapshotInputStream.read(currentChunk);
+ if (numRead != size) {
+ throw new IOException(String.format(
+ "The # of bytes read from the input stream, %d, does not match the expected # %d",
+ numRead, size));
+ }
+
+ nextChunkHashCode = Arrays.hashCode(currentChunk);
+
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
+ snapshotSize, start, size, nextChunkHashCode);