import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Slice off the entry's bytes
final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
- // Compute the checksum for the entry bytes.
- final var crc32 = new CRC32();
- crc32.update(entryBuffer);
-
// If the stored checksum does not equal the computed checksum, do not proceed further
- final var computed = (int) crc32.getValue();
+ final var computed = SegmentEntry.computeChecksum(entryBuffer);
if (checksum != computed) {
LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
invalidateCache();
// update position
position += SegmentEntry.HEADER_BYTES + length;
- // return bytes
- entryBuffer.rewind();
- return Unpooled.buffer(length).writeBytes(entryBuffer);
+ // rewind and return
+ return Unpooled.buffer(length).writeBytes(entryBuffer.rewind());
}
/**
import io.atomix.storage.journal.index.JournalIndex;
import io.netty.buffer.ByteBuf;
import java.nio.MappedByteBuffer;
-import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
}
// allocate buffer and write data
- final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
- writeBuffer.put(buf.nioBuffer());
+ final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
+ writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
// Compute the checksum for the entry.
- final var crc32 = new CRC32();
- crc32.update(writeBuffer.flip().position(HEADER_BYTES));
+ final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
- fileWriter.commitWrite(position, writeBuffer.rewind());
+ fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
// Update the last entry with the correct index/term/length.
currentPosition = nextPosition;
package io.atomix.storage.journal;
import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
+import org.eclipse.jdt.annotation.NonNull;
/**
* An {@link Indexed} entry read from {@link JournalSegment}.
*
- * @param checksum The CRC32 checksum of data
+ * @param checksum The {@link CRC32} checksum of data
* @param bytes Entry bytes
*/
-record SegmentEntry(int checksum, ByteBuffer bytes) {
+record SegmentEntry(int checksum, @NonNull ByteBuffer bytes) {
/**
* The size of the header, comprising of:
* <ul>
throw new IllegalArgumentException("Invalid entry bytes " + bytes);
}
}
+
+ /**
+ * Compute the {@link CRC32} checksum of a buffer. Note that the buffer will be consumed during this process.
+ *
+ * @param bytes buffer to checksum
+ * @return the checksum
+ */
+ static int computeChecksum(final ByteBuffer bytes) {
+ final var crc32 = new CRC32();
+ crc32.update(bytes);
+ return (int) crc32.getValue();
+ }
}