Centralize CRC32 computation 30/111630/3
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 5 May 2024 23:22:47 +0000 (01:22 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 6 May 2024 00:08:13 +0000 (02:08 +0200)
We have essentially-duplicate codepaths between
JournalSegment{Reader,Writer}. Move computation to SegmentEntry.

JIRA: CONTROLLER-2115
Change-Id: I776b693b6e88d84ddb99c274371ac694aa536d1d
Signed-off-by: Ruslan Kashapov <ruslan.kashapov@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java

index 5258f4323caaf5ac401ec1f1c0c8644e77225265..bba89dfdc96b13921645d02217de9145dc840c96 100644 (file)
@@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,12 +100,8 @@ final class JournalSegmentReader {
 
         // Slice off the entry's bytes
         final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
-        // Compute the checksum for the entry bytes.
-        final var crc32 = new CRC32();
-        crc32.update(entryBuffer);
-
         // If the stored checksum does not equal the computed checksum, do not proceed further
-        final var computed = (int) crc32.getValue();
+        final var computed = SegmentEntry.computeChecksum(entryBuffer);
         if (checksum != computed) {
             LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
             invalidateCache();
@@ -116,9 +111,8 @@ final class JournalSegmentReader {
         // update position
         position += SegmentEntry.HEADER_BYTES + length;
 
-        // return bytes
-        entryBuffer.rewind();
-        return Unpooled.buffer(length).writeBytes(entryBuffer);
+        // rewind and return
+        return Unpooled.buffer(length).writeBytes(entryBuffer.rewind());
     }
 
     /**
index 317e8fd45b5360b0d84f5a52f91328da8de7354e..63f5303ecc99ec7e04d8c7849cd502d77bee3082 100644 (file)
@@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull;
 import io.atomix.storage.journal.index.JournalIndex;
 import io.netty.buffer.ByteBuf;
 import java.nio.MappedByteBuffer;
-import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
@@ -103,16 +102,14 @@ final class JournalSegmentWriter {
         }
 
         // allocate buffer and write data
-        final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
-        writeBuffer.put(buf.nioBuffer());
+        final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
+        writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
 
         // Compute the checksum for the entry.
-        final var crc32 = new CRC32();
-        crc32.update(writeBuffer.flip().position(HEADER_BYTES));
+        final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
 
         // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-        writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
-        fileWriter.commitWrite(position, writeBuffer.rewind());
+        fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
 
         // Update the last entry with the correct index/term/length.
         currentPosition = nextPosition;
index be6c6ba831dd27e1af054cacc9eed407f7e297e7..432f9b9dec0395385c8e930c7eae3bbd51307ab2 100644 (file)
 package io.atomix.storage.journal;
 
 import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * An {@link Indexed} entry read from {@link JournalSegment}.
  *
- * @param checksum The CRC32 checksum of data
+ * @param checksum The {@link CRC32} checksum of data
  * @param bytes Entry bytes
  */
-record SegmentEntry(int checksum, ByteBuffer bytes) {
+record SegmentEntry(int checksum, @NonNull ByteBuffer bytes) {
     /**
      * The size of the header, comprising of:
      * <ul>
@@ -38,4 +40,16 @@ record SegmentEntry(int checksum, ByteBuffer bytes) {
             throw new IllegalArgumentException("Invalid entry bytes " + bytes);
         }
     }
+
+    /**
+     * Compute the {@link CRC32} checksum of a buffer. Note that the buffer will be consumed during this process.
+     *
+     * @param bytes buffer to checksum
+     * @return the checksum
+     */
+    static int computeChecksum(final ByteBuffer bytes) {
+        final var crc32 = new CRC32();
+        crc32.update(bytes);
+        return (int) crc32.getValue();
+    }
 }