From b2f070be60f9c49f74ec4fc0198460fb461c180a Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 01:22:47 +0200 Subject: [PATCH] Centralize CRC32 computation We have essentially-duplicate codepaths between JournalSegment{Reader,Writer}. Move computation to SegmentEntry. JIRA: CONTROLLER-2115 Change-Id: I776b693b6e88d84ddb99c274371ac694aa536d1d Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- .../storage/journal/JournalSegmentReader.java | 12 +++--------- .../storage/journal/JournalSegmentWriter.java | 11 ++++------- .../atomix/storage/journal/SegmentEntry.java | 18 ++++++++++++++++-- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index 5258f4323c..bba89dfdc9 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import java.util.zip.CRC32; import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,12 +100,8 @@ final class JournalSegmentReader { // Slice off the entry's bytes final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length); - // Compute the checksum for the entry bytes. - final var crc32 = new CRC32(); - crc32.update(entryBuffer); - // If the stored checksum does not equal the computed checksum, do not proceed further - final var computed = (int) crc32.getValue(); + final var computed = SegmentEntry.computeChecksum(entryBuffer); if (checksum != computed) { LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed)); invalidateCache(); @@ -116,9 +111,8 @@ final class JournalSegmentReader { // update position position += SegmentEntry.HEADER_BYTES + length; - // return bytes - entryBuffer.rewind(); - return Unpooled.buffer(length).writeBytes(entryBuffer); + // rewind and return + return Unpooled.buffer(length).writeBytes(entryBuffer.rewind()); } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index 317e8fd45b..63f5303ecc 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull; import io.atomix.storage.journal.index.JournalIndex; import io.netty.buffer.ByteBuf; import java.nio.MappedByteBuffer; -import java.util.zip.CRC32; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; @@ -103,16 +102,14 @@ final class JournalSegmentWriter { } // allocate buffer and write data - final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES); - writeBuffer.put(buf.nioBuffer()); + final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES); + writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length); // Compute the checksum for the entry. - final var crc32 = new CRC32(); - crc32.update(writeBuffer.flip().position(HEADER_BYTES)); + final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length)); // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue()); - fileWriter.commitWrite(position, writeBuffer.rewind()); + fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum)); // Update the last entry with the correct index/term/length. currentPosition = nextPosition; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java index be6c6ba831..432f9b9dec 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java @@ -16,14 +16,16 @@ package io.atomix.storage.journal; import java.nio.ByteBuffer; +import java.util.zip.CRC32; +import org.eclipse.jdt.annotation.NonNull; /** * An {@link Indexed} entry read from {@link JournalSegment}. * - * @param checksum The CRC32 checksum of data + * @param checksum The {@link CRC32} checksum of data * @param bytes Entry bytes */ -record SegmentEntry(int checksum, ByteBuffer bytes) { +record SegmentEntry(int checksum, @NonNull ByteBuffer bytes) { /** * The size of the header, comprising of: *
    @@ -38,4 +40,16 @@ record SegmentEntry(int checksum, ByteBuffer bytes) { throw new IllegalArgumentException("Invalid entry bytes " + bytes); } } + + /** + * Compute the {@link CRC32} checksum of a buffer. Note that the buffer will be consumed during this process. + * + * @param bytes buffer to checksum + * @return the checksum + */ + static int computeChecksum(final ByteBuffer bytes) { + final var crc32 = new CRC32(); + crc32.update(bytes); + return (int) crc32.getValue(); + } } -- 2.36.6