Move ENTRY_HEADER_BYTES 07/110907/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 20 Mar 2024 15:52:24 +0000 (16:52 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 20 Mar 2024 15:52:52 +0000 (16:52 +0100)
We have SegmentEntry.HEADER_BYTES instead of ENTRY_HEADER_BYTES.

JIRA: CONTROLLER-2109
Change-Id: I4bc66155dccd3e177a61bad28be50e0bc62827ab
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java

index 771c0a89d3975666ac7c9726144fb8c3d01be6cf..64ae58c484efee459c1c672400dc198632927d24 100644 (file)
@@ -41,7 +41,7 @@ final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
       JournalSerdes namespace) {
     super(segment, maxEntrySize, index, namespace);
     this.channel = channel;
-    this.memory = ByteBuffer.allocate((maxEntrySize + JournalSegmentWriter.ENTRY_HEADER_BYTES) * 2);
+    this.memory = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2);
     reset();
   }
 
index cbbb7799162f135206b9a9d8e870ce88b7e4bb1d..c46b55cac8ecf4bac6098647b3964b696b8d3ba6 100644 (file)
@@ -16,6 +16,8 @@
  */
 package io.atomix.storage.journal;
 
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
+
 import com.esotericsoftware.kryo.KryoException;
 import com.google.common.annotations.VisibleForTesting;
 import io.atomix.storage.journal.index.JournalIndex;
@@ -47,7 +49,7 @@ import org.slf4j.LoggerFactory;
  */
 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
-  private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[ENTRY_HEADER_BYTES]);
+  private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
 
   private final ByteBuffer memory;
   private Indexed<E> lastEntry;
@@ -72,7 +74,7 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   private static ByteBuffer allocMemory(int maxEntrySize) {
-    final var buf = ByteBuffer.allocate((maxEntrySize + ENTRY_HEADER_BYTES) * 2);
+    final var buf = ByteBuffer.allocate((maxEntrySize + HEADER_BYTES) * 2);
     buf.limit(0);
     return buf;
   }
@@ -124,7 +126,7 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
               nextIndex++;
 
               // Update the current position for indexing.
-              currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
+              currentPosition = currentPosition + HEADER_BYTES + length;
               memory.position(memory.position() + length);
           }
       } catch (IOException e) {
@@ -137,11 +139,11 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
           final int maxEntrySize) throws IOException {
       int remaining = memory.remaining();
       boolean compacted;
-      if (remaining < ENTRY_HEADER_BYTES) {
+      if (remaining < HEADER_BYTES) {
           // We do not have the header available. Move the pointer and read.
           channel.read(memory.compact());
           remaining = memory.flip().remaining();
-          if (remaining < ENTRY_HEADER_BYTES) {
+          if (remaining < HEADER_BYTES) {
               // could happen with mis-padded segment
               return null;
           }
@@ -212,16 +214,16 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
     // Serialize the entry.
     try {
-      namespace.serialize(entry, memory.clear().position(ENTRY_HEADER_BYTES));
+      namespace.serialize(entry, memory.clear().position(HEADER_BYTES));
     } catch (KryoException e) {
       throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
     }
     memory.flip();
 
-    final int length = memory.limit() - ENTRY_HEADER_BYTES;
+    final int length = memory.limit() - HEADER_BYTES;
 
     // Ensure there's enough space left in the buffer to store the entry.
-    if (maxSegmentSize - currentPosition < length + ENTRY_HEADER_BYTES) {
+    if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
       throw new BufferOverflowException();
     }
 
@@ -232,7 +234,7 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
     // Compute the checksum for the entry.
     final CRC32 crc32 = new CRC32();
-    crc32.update(memory.array(), ENTRY_HEADER_BYTES, memory.limit() - ENTRY_HEADER_BYTES);
+    crc32.update(memory.array(), HEADER_BYTES, memory.limit() - HEADER_BYTES);
     final long checksum = crc32.getValue();
 
     // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
@@ -248,7 +250,7 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     this.lastEntry = indexedEntry;
     this.index.index(index, (int) currentPosition);
 
-    currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
+    currentPosition = currentPosition + HEADER_BYTES + length;
     return (Indexed<T>) indexedEntry;
   }
 
index ddc70fca0e9405589760e0dc700f63efca49b1e9..805d23f2f179510e950f45dbd567e20b896e6303 100644 (file)
@@ -24,15 +24,6 @@ import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 
 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
-    /**
-     * The size of the header, comprising of:
-     * <ul>
-     *   <li>32-bit signed entry length</li>
-     *   <li>32-bit unsigned CRC32 checksum</li>
-     * </li>
-     */
-    static final int ENTRY_HEADER_BYTES = Integer.BYTES + Integer.BYTES;
-
     final @NonNull FileChannel channel;
     final @NonNull JournalIndex index;
     final @NonNull JournalSerdes namespace;
@@ -45,18 +36,18 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         this.channel = requireNonNull(channel);
         this.index = requireNonNull(index);
         this.namespace = requireNonNull(namespace);
-        this.maxSegmentSize = segment.descriptor().maxSegmentSize();
+        maxSegmentSize = segment.descriptor().maxSegmentSize();
         this.maxEntrySize = maxEntrySize;
-        this.firstIndex = segment.index();
+        firstIndex = segment.index();
     }
 
-    JournalSegmentWriter(JournalSegmentWriter<E> previous) {
-        this.channel = previous.channel;
-        this.index = previous.index;
-        this.namespace = previous.namespace;
-        this.maxSegmentSize = previous.maxSegmentSize;
-        this.maxEntrySize = previous.maxEntrySize;
-        this.firstIndex = previous.firstIndex;
+    JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+        channel = previous.channel;
+        index = previous.index;
+        namespace = previous.namespace;
+        maxSegmentSize = previous.maxSegmentSize;
+        maxEntrySize = previous.maxEntrySize;
+        firstIndex = previous.firstIndex;
     }
 
     /**
index 5749f24228d467893110abff7df3128e38e8f2c2..99180c5840321165696796a45234f5dcfcbb237e 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
+
 import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
-
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
@@ -161,11 +162,11 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
     // Serialize the entry.
     int position = buffer.position();
-    if (position + ENTRY_HEADER_BYTES > buffer.limit()) {
+    if (position + HEADER_BYTES > buffer.limit()) {
       throw new BufferOverflowException();
     }
 
-    buffer.position(position + ENTRY_HEADER_BYTES);
+    buffer.position(position + HEADER_BYTES);
 
     try {
       namespace.serialize(entry, buffer);
@@ -173,7 +174,7 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
       throw new BufferOverflowException();
     }
 
-    final int length = buffer.position() - (position + ENTRY_HEADER_BYTES);
+    final int length = buffer.position() - (position + HEADER_BYTES);
 
     // If the entry length exceeds the maximum entry size then throw an exception.
     if (length > maxEntrySize) {
@@ -184,14 +185,14 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
     // Compute the checksum for the entry.
     final CRC32 crc32 = new CRC32();
-    buffer.position(position + ENTRY_HEADER_BYTES);
+    buffer.position(position + HEADER_BYTES);
     ByteBuffer slice = buffer.slice();
     slice.limit(length);
     crc32.update(slice);
     final long checksum = crc32.getValue();
 
     // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-    buffer.position(position).putInt(length).putInt((int) checksum).position(position + ENTRY_HEADER_BYTES + length);
+    buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length);
 
     // Update the last entry with the correct index/term/length.
     Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
index 2376cab7c12bb979ffe07a3c0375e6674b9c3e83..d7b9ec7cd81197d097ae6e4da760d5106a03bfc2 100644 (file)
@@ -11,6 +11,9 @@ import java.nio.ByteBuffer;
 
 /**
  * An {@link Indexed} entry read from {@link JournalSegment}.
+ *
+ * @param checksum The CRC32 checksum of data
+ * @param bytes Entry bytes
  */
 record SegmentEntry(int checksum, ByteBuffer bytes) {
     /**