Introduce SegmentEntry 96/110896/4
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 20 Mar 2024 00:52:54 +0000 (01:52 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 20 Mar 2024 08:03:36 +0000 (09:03 +0100)
This is a helper DTO to communicate CRC32 and a buffer slice during
DiskJournalSegmentWriter's indexing.

This allows us improve the reset(long) loop to a manageable size, which
is mostly self-explanatory.

JIRA: CONTROLLER-2109
Change-Id: Ife1bddaf7684a304a0e7cdfd24ca9e774ac5143d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java [new file with mode: 0644]
atomix-storage/src/test/java/io/atomix/storage/journal/DiskJournalSegmentWriterTest.java

index 62c5b4c7678cfc90e54d290628291804eb7580f1..cbbb7799162f135206b9a9d8e870ce88b7e4bb1d 100644 (file)
@@ -21,12 +21,14 @@ import com.google.common.annotations.VisibleForTesting;
 import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.zip.CRC32;
+import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Segment writer.
@@ -44,6 +46,7 @@ import java.util.zip.CRC32;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+  private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[ENTRY_HEADER_BYTES]);
 
   private final ByteBuffer memory;
@@ -90,60 +93,48 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  void reset(long index) {
-    long nextIndex = firstIndex;
-
-    // Clear the buffer indexes.
-    currentPosition = JournalSegmentDescriptor.BYTES;
-
-    try {
-      // Clear memory buffer and read fist chunk
-      channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
-      memory.flip();
-
-      // Read the entry length.
-      int length = memory.getInt();
-
-      // If the length is non-zero, read the entry.
-      while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
-
-        // Read the checksum of the entry.
-        final long checksum = memory.getInt() & 0xFFFFFFFFL;
-
-        // Slice off the entry's bytes
-        final ByteBuffer entryBytes = memory.slice();
-        entryBytes.limit(length);
-
-        // Compute the checksum for the entry bytes.
-        final CRC32 crc32 = new CRC32();
-        crc32.update(entryBytes);
-
-        // If the stored checksum does not equal the computed checksum, do not proceed further
-        if (checksum != crc32.getValue()) {
-          break;
-        }
-
-        entryBytes.rewind();
-        final E entry = namespace.deserialize(entryBytes);
-        lastEntry = new Indexed<>(nextIndex, entry, length);
-        this.index.index(nextIndex, (int) currentPosition);
-        nextIndex++;
-
-        // Update the current position for indexing.
-        currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
-        memory.position(memory.position() + length);
-
-        length = prepareNextEntry(channel, memory);
+  void reset(final long index) {
+      long nextIndex = firstIndex;
+
+      // Clear the buffer indexes.
+      currentPosition = JournalSegmentDescriptor.BYTES;
+
+      try {
+          // Clear memory buffer and read fist chunk
+          channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
+          memory.flip();
+
+          while (index == 0 || nextIndex <= index) {
+              final var entry = prepareNextEntry(channel, memory, maxEntrySize);
+              if (entry == null) {
+                  break;
+              }
+
+              final var bytes = entry.bytes();
+              final var length = bytes.remaining();
+              try {
+                  lastEntry = new Indexed<>(nextIndex, namespace.<E>deserialize(bytes), length);
+              } catch (KryoException e) {
+                  // No-op, position is only updated on success
+                  LOG.debug("Failed to deserialize entry", e);
+                  break;
+              }
+
+              this.index.index(nextIndex, (int) currentPosition);
+              nextIndex++;
+
+              // Update the current position for indexing.
+              currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
+              memory.position(memory.position() + length);
+          }
+      } catch (IOException e) {
+          throw new StorageException(e);
       }
-    } catch (BufferUnderflowException e) {
-      // No-op, position is only updated on success
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
   }
 
   @VisibleForTesting
-  static int prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory) throws IOException {
+  static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory,
+          final int maxEntrySize) throws IOException {
       int remaining = memory.remaining();
       boolean compacted;
       if (remaining < ENTRY_HEADER_BYTES) {
@@ -152,25 +143,32 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
           remaining = memory.flip().remaining();
           if (remaining < ENTRY_HEADER_BYTES) {
               // could happen with mis-padded segment
-              return 0;
+              return null;
           }
           compacted = true;
       } else {
           compacted = false;
       }
 
+      int length;
       while (true) {
-          final int length = memory.mark().getInt();
+          length = memory.mark().getInt();
+          if (length < 1 || length > maxEntrySize) {
+              // Invalid length,
+              memory.reset();
+              return null;
+          }
+
           if (remaining >= Integer.BYTES + length) {
               // Fast path: we have the entry properly positioned
-              return length;
+              break;
           }
 
           // Not enough data for entry, to header start
           memory.reset();
           if (compacted) {
               // we have already compacted the buffer, there is just not enough data
-              return 0;
+              return null;
           }
 
           // Try to read more data and check again
@@ -178,6 +176,27 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
           remaining = memory.flip().remaining();
           compacted = true;
       }
+
+      // Read the checksum of the entry.
+      final int checksum = memory.getInt();
+
+      // Slice off the entry's bytes
+      final var entryBytes = memory.slice();
+      entryBytes.limit(length);
+
+      // Compute the checksum for the entry bytes.
+      final var crc32 = new CRC32();
+      crc32.update(entryBytes);
+
+      // If the stored checksum does not equal the computed checksum, do not proceed further
+      final var computed = (int) crc32.getValue();
+      if (checksum != computed) {
+          LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
+          memory.reset();
+          return null;
+      }
+
+      return new SegmentEntry(checksum, entryBytes.rewind());
   }
 
   @Override
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java
new file mode 100644 (file)
index 0000000..2376cab
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link Indexed} entry read from {@link JournalSegment}.
+ */
+record SegmentEntry(int checksum, ByteBuffer bytes) {
+    /**
+     * The size of the header, comprising of:
+     * <ul>
+     *   <li>32-bit signed entry length</li>
+     *   <li>32-bit unsigned CRC32 checksum</li>
+     * </li>
+     */
+    static final int HEADER_BYTES = Integer.BYTES + Integer.BYTES;
+
+    SegmentEntry {
+        if (bytes.remaining() < 1) {
+            throw new IllegalArgumentException("Invalid entry bytes " + bytes);
+        }
+    }
+}
index 8e73bb9d5ff2fe6689607af4943da9a7f9240508..4d53cdc7365747e80ae56396f8a40607799354e3 100644 (file)
@@ -7,14 +7,17 @@
  */
 package io.atomix.storage.journal;
 
-import static io.atomix.storage.journal.DiskJournalSegmentWriter.prepareNextEntry;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SeekableByteChannel;
 import java.util.function.ToIntFunction;
+import org.eclipse.jdt.annotation.Nullable;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -23,6 +26,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
 @ExtendWith(MockitoExtension.class)
 class DiskJournalSegmentWriterTest {
     private static final int BUFFER_SIZE = 56;
+    private static final int MAX_ENTRY_SIZE = 42;
 
     @Mock
     private SeekableByteChannel channel;
@@ -31,11 +35,13 @@ class DiskJournalSegmentWriterTest {
 
     @Test
     void testReadFastPath() throws Exception {
-        buffer.putInt(42).putInt(0).put(new byte[42]).flip();
+        buffer.putInt(42).putInt(0xE46F28FB).put(new byte[42]).flip();
 
-        assertEquals(42, prepareNextEntry(channel, buffer));
-        assertEquals(4, buffer.position());
-        assertEquals(46, buffer.remaining());
+        final var entry = prepareNextEntry(channel, buffer);
+        assertNotNull(entry);
+        assertEquals(42, entry.bytes().remaining());
+        assertEquals(8, buffer.position());
+        assertEquals(42, buffer.remaining());
     }
 
     @Test
@@ -47,7 +53,7 @@ class DiskJournalSegmentWriterTest {
             return 0;
         });
 
-        assertEquals(0, prepareNextEntry(channel, buffer));
+        assertNull(prepareNextEntry(channel, buffer));
         assertEquals(0, buffer.remaining());
     }
 
@@ -57,13 +63,15 @@ class DiskJournalSegmentWriterTest {
 
         prepareRead(buf -> {
             assertEquals(0, buf.position());
-            buf.putInt(20).putInt(0).put(new byte[20]);
+            buf.putInt(20).putInt(0x0FD59B8D).put(new byte[20]);
             return 28;
         });
 
-        assertEquals(20, prepareNextEntry(channel, buffer));
-        assertEquals(4, buffer.position());
-        assertEquals(24, buffer.remaining());
+        final var entry = prepareNextEntry(channel, buffer);
+        assertNotNull(entry);
+        assertEquals(20, entry.bytes().remaining());
+        assertEquals(8, buffer.position());
+        assertEquals(20, buffer.remaining());
     }
 
     @Test
@@ -76,7 +84,7 @@ class DiskJournalSegmentWriterTest {
             return 28;
         });
 
-        assertEquals(0, prepareNextEntry(channel, buffer));
+        assertNull(prepareNextEntry(channel, buffer));
         assertEquals(0, buffer.position());
         assertEquals(28, buffer.remaining());
     }
@@ -90,7 +98,7 @@ class DiskJournalSegmentWriterTest {
             return 0;
         });
 
-        assertEquals(0, prepareNextEntry(channel, buffer));
+        assertNull(prepareNextEntry(channel, buffer));
         assertEquals(28, buffer.remaining());
         assertEquals(0, buffer.position());
     }
@@ -98,4 +106,9 @@ class DiskJournalSegmentWriterTest {
     final void prepareRead(final ToIntFunction<ByteBuffer> onRead) throws Exception {
         doAnswer(invocation -> onRead.applyAsInt(invocation.getArgument(0))).when(channel).read(any());
     }
+
+    private static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory)
+            throws IOException {
+        return DiskJournalSegmentWriter.prepareNextEntry(channel, memory, MAX_ENTRY_SIZE);
+    }
 }