Improve disk entry access 10/110810/5
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 17 Mar 2024 11:22:10 +0000 (12:22 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 20 Mar 2024 08:03:36 +0000 (09:03 +0100)
Eliminate superfluous compactions while a complete entry is available.
This is done by carefully looking at the buffer and figuring out how
many bytes we actually need progress.

JIRA: CONTROLLER-2109
Change-Id: If091b5c6f74da6a989629dae49137ed492725e2d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java
atomix-storage/src/test/java/io/atomix/storage/journal/DiskJournalSegmentWriterTest.java [new file with mode: 0644]

index 3f6371781a4061caf2b0fcb7b43d21e26036aaff..62c5b4c7678cfc90e54d290628291804eb7580f1 100644 (file)
 package io.atomix.storage.journal;
 
 import com.esotericsoftware.kryo.KryoException;
+import com.google.common.annotations.VisibleForTesting;
 import io.atomix.storage.journal.index.JournalIndex;
-
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
 import java.util.zip.CRC32;
 
 /**
@@ -132,13 +133,7 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
         currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
         memory.position(memory.position() + length);
 
-        // Read more bytes from the segment if necessary.
-        if (memory.remaining() < maxEntrySize) {
-          channel.read(memory.compact());
-          memory.flip();
-        }
-
-        length = memory.getInt();
+        length = prepareNextEntry(channel, memory);
       }
     } catch (BufferUnderflowException e) {
       // No-op, position is only updated on success
@@ -147,6 +142,44 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     }
   }
 
+  @VisibleForTesting
+  static int prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory) throws IOException {
+      int remaining = memory.remaining();
+      boolean compacted;
+      if (remaining < ENTRY_HEADER_BYTES) {
+          // We do not have the header available. Move the pointer and read.
+          channel.read(memory.compact());
+          remaining = memory.flip().remaining();
+          if (remaining < ENTRY_HEADER_BYTES) {
+              // could happen with mis-padded segment
+              return 0;
+          }
+          compacted = true;
+      } else {
+          compacted = false;
+      }
+
+      while (true) {
+          final int length = memory.mark().getInt();
+          if (remaining >= Integer.BYTES + length) {
+              // Fast path: we have the entry properly positioned
+              return length;
+          }
+
+          // Not enough data for entry, to header start
+          memory.reset();
+          if (compacted) {
+              // we have already compacted the buffer, there is just not enough data
+              return 0;
+          }
+
+          // Try to read more data and check again
+          channel.read(memory.compact());
+          remaining = memory.flip().remaining();
+          compacted = true;
+      }
+  }
+
   @Override
   Indexed<E> getLastEntry() {
     return lastEntry;
diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/DiskJournalSegmentWriterTest.java b/atomix-storage/src/test/java/io/atomix/storage/journal/DiskJournalSegmentWriterTest.java
new file mode 100644 (file)
index 0000000..8e73bb9
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import static io.atomix.storage.journal.DiskJournalSegmentWriter.prepareNextEntry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.util.function.ToIntFunction;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DiskJournalSegmentWriterTest {
+    private static final int BUFFER_SIZE = 56;
+
+    @Mock
+    private SeekableByteChannel channel;
+
+    private final ByteBuffer buffer = ByteBuffer.wrap(new byte[BUFFER_SIZE]);
+
+    @Test
+    void testReadFastPath() throws Exception {
+        buffer.putInt(42).putInt(0).put(new byte[42]).flip();
+
+        assertEquals(42, prepareNextEntry(channel, buffer));
+        assertEquals(4, buffer.position());
+        assertEquals(46, buffer.remaining());
+    }
+
+    @Test
+    void testEmptyBufferEndOfFile() throws Exception {
+        buffer.position(BUFFER_SIZE);
+
+        prepareRead(buf -> {
+            assertEquals(0, buf.position());
+            return 0;
+        });
+
+        assertEquals(0, prepareNextEntry(channel, buffer));
+        assertEquals(0, buffer.remaining());
+    }
+
+    @Test
+    void testEmptyBuffer() throws Exception {
+        buffer.position(BUFFER_SIZE);
+
+        prepareRead(buf -> {
+            assertEquals(0, buf.position());
+            buf.putInt(20).putInt(0).put(new byte[20]);
+            return 28;
+        });
+
+        assertEquals(20, prepareNextEntry(channel, buffer));
+        assertEquals(4, buffer.position());
+        assertEquals(24, buffer.remaining());
+    }
+
+    @Test
+    void testEmptyBufferNotEnough() throws Exception {
+        buffer.position(BUFFER_SIZE);
+
+        prepareRead(buf -> {
+            assertEquals(0, buf.position());
+            buf.putInt(42).putInt(0).put(new byte[20]);
+            return 28;
+        });
+
+        assertEquals(0, prepareNextEntry(channel, buffer));
+        assertEquals(0, buffer.position());
+        assertEquals(28, buffer.remaining());
+    }
+
+    @Test
+    void testHeaderWithNotEnough() throws Exception {
+        buffer.putInt(42).putInt(0).put(new byte[20]).flip();
+
+        prepareRead(buf -> {
+            assertEquals(28, buf.position());
+            return 0;
+        });
+
+        assertEquals(0, prepareNextEntry(channel, buffer));
+        assertEquals(28, buffer.remaining());
+        assertEquals(0, buffer.position());
+    }
+
+    final void prepareRead(final ToIntFunction<ByteBuffer> onRead) throws Exception {
+        doAnswer(invocation -> onRead.applyAsInt(invocation.getArgument(0))).when(channel).read(any());
+    }
+}