Unify Disk segment reading 03/111003/10
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 23 Mar 2024 23:52:08 +0000 (00:52 +0100)
committerRobert Varga <nite@hq.sk>
Mon, 25 Mar 2024 13:14:34 +0000 (13:14 +0000)
DiskJournalSegmentWriter contains code duplicating DiskFileReader. Let's
remediate the situation by having an internal DiskFileReader, through
which we access the file in a read-only manner.

JIRA: CONTROLLER-2109
Change-Id: I0a5ffd1aebbea71513f6cd70e2138562755316fa
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/test/java/io/atomix/storage/journal/DiskJournalSegmentWriterTest.java [deleted file]

index 7d36df238026eb6bceaa3a73d3e5e362ab82921e..311d16b1500201fb39edb77fc0dc67ba4265ce53 100644 (file)
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
-import org.checkerframework.checker.nullness.qual.NonNull;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
@@ -40,12 +40,21 @@ final class DiskFileReader extends FileReader {
     private int bufferPosition;
 
     DiskFileReader(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
+        this(path, channel, allocateBuffer(maxSegmentSize, maxEntrySize));
+    }
+
+    // Note: take ownership of the buffer
+    DiskFileReader(final Path path, final FileChannel channel, final ByteBuffer buffer) {
         super(path);
         this.channel = requireNonNull(channel);
-        buffer = ByteBuffer.allocate(chooseBufferSize(maxSegmentSize, maxEntrySize)).flip();
+        this.buffer = buffer.flip();
         bufferPosition = 0;
     }
 
+    static ByteBuffer allocateBuffer(final int maxSegmentSize, final int maxEntrySize) {
+        return ByteBuffer.allocate(chooseBufferSize(maxSegmentSize, maxEntrySize));
+    }
+
     private static int chooseBufferSize(final int maxSegmentSize, final int maxEntrySize) {
         if (maxSegmentSize <= MIN_IO_SIZE) {
             // just buffer the entire segment
index c46b55cac8ecf4bac6098647b3964b696b8d3ba6..e51de40687c9723ff74dc3340975f2260875003a 100644 (file)
@@ -19,18 +19,14 @@ package io.atomix.storage.journal;
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 
 import com.esotericsoftware.kryo.KryoException;
-import com.google.common.annotations.VisibleForTesting;
+import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
 import java.util.zip.CRC32;
-import org.eclipse.jdt.annotation.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Segment writer.
@@ -48,37 +44,34 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
-  private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
 
-  private final ByteBuffer memory;
+  private final JournalSegmentReader<E> reader;
+  private final ByteBuffer buffer;
+
   private Indexed<E> lastEntry;
   private long currentPosition;
 
-  DiskJournalSegmentWriter(
-      FileChannel channel,
-      JournalSegment<E> segment,
-      int maxEntrySize,
-      JournalIndex index,
-      JournalSerdes namespace) {
+  DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+          final JournalIndex index, final JournalSerdes namespace) {
     super(channel, segment, maxEntrySize, index, namespace);
-    memory = allocMemory(maxEntrySize);
+
+    buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+    final var fileReader = new DiskFileReader(segment.file().file().toPath(), channel, maxSegmentSize, maxEntrySize);
+    reader = new JournalSegmentReader<>(segment, fileReader, maxEntrySize, namespace);
     reset(0);
   }
 
-  DiskJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+  DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
     super(previous);
-    memory = allocMemory(maxEntrySize);
+
+    buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+    final var fileReader = new DiskFileReader(segment.file().file().toPath(), channel, maxSegmentSize, maxEntrySize);
+    reader = new JournalSegmentReader<>(segment, fileReader, maxEntrySize, namespace);
     lastEntry = previous.getLastEntry();
     currentPosition = position;
   }
 
-  private static ByteBuffer allocMemory(int maxEntrySize) {
-    final var buf = ByteBuffer.allocate((maxEntrySize + HEADER_BYTES) * 2);
-    buf.limit(0);
-    return buf;
-  }
-
   @Override
   MappedByteBuffer buffer() {
     return null;
@@ -96,109 +89,36 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   void reset(final long index) {
-      long nextIndex = firstIndex;
-
-      // Clear the buffer indexes.
-      currentPosition = JournalSegmentDescriptor.BYTES;
-
+      // acquire ownership of cache and make sure reader does not see anything we've done once we're done
+      reader.invalidateCache();
       try {
-          // Clear memory buffer and read fist chunk
-          channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
-          memory.flip();
-
-          while (index == 0 || nextIndex <= index) {
-              final var entry = prepareNextEntry(channel, memory, maxEntrySize);
-              if (entry == null) {
-                  break;
-              }
-
-              final var bytes = entry.bytes();
-              final var length = bytes.remaining();
-              try {
-                  lastEntry = new Indexed<>(nextIndex, namespace.<E>deserialize(bytes), length);
-              } catch (KryoException e) {
-                  // No-op, position is only updated on success
-                  LOG.debug("Failed to deserialize entry", e);
-                  break;
-              }
-
-              this.index.index(nextIndex, (int) currentPosition);
-              nextIndex++;
-
-              // Update the current position for indexing.
-              currentPosition = currentPosition + HEADER_BYTES + length;
-              memory.position(memory.position() + length);
-          }
-      } catch (IOException e) {
-          throw new StorageException(e);
+          resetWithBuffer(index);
+      } finally {
+          // Make sure reader does not see anything we've done
+          reader.invalidateCache();
       }
   }
 
-  @VisibleForTesting
-  static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory,
-          final int maxEntrySize) throws IOException {
-      int remaining = memory.remaining();
-      boolean compacted;
-      if (remaining < HEADER_BYTES) {
-          // We do not have the header available. Move the pointer and read.
-          channel.read(memory.compact());
-          remaining = memory.flip().remaining();
-          if (remaining < HEADER_BYTES) {
-              // could happen with mis-padded segment
-              return null;
-          }
-          compacted = true;
-      } else {
-          compacted = false;
-      }
+  private void resetWithBuffer(final long index) {
+      long nextIndex = firstIndex;
 
-      int length;
-      while (true) {
-          length = memory.mark().getInt();
-          if (length < 1 || length > maxEntrySize) {
-              // Invalid length,
-              memory.reset();
-              return null;
-          }
+      // Clear the buffer indexes and acquire ownership of the buffer
+      currentPosition = JournalSegmentDescriptor.BYTES;
+      reader.setPosition(JournalSegmentDescriptor.BYTES);
 
-          if (remaining >= Integer.BYTES + length) {
-              // Fast path: we have the entry properly positioned
+      while (index == 0 || nextIndex <= index) {
+          final var entry = reader.readEntry(nextIndex);
+          if (entry == null) {
               break;
           }
 
-          // Not enough data for entry, to header start
-          memory.reset();
-          if (compacted) {
-              // we have already compacted the buffer, there is just not enough data
-              return null;
-          }
+          lastEntry = entry;
+          this.index.index(nextIndex, (int) currentPosition);
+          nextIndex++;
 
-          // Try to read more data and check again
-          channel.read(memory.compact());
-          remaining = memory.flip().remaining();
-          compacted = true;
+          // Update the current position for indexing.
+          currentPosition = currentPosition + HEADER_BYTES + entry.size();
       }
-
-      // Read the checksum of the entry.
-      final int checksum = memory.getInt();
-
-      // Slice off the entry's bytes
-      final var entryBytes = memory.slice();
-      entryBytes.limit(length);
-
-      // Compute the checksum for the entry bytes.
-      final var crc32 = new CRC32();
-      crc32.update(entryBytes);
-
-      // If the stored checksum does not equal the computed checksum, do not proceed further
-      final var computed = (int) crc32.getValue();
-      if (checksum != computed) {
-          LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
-          memory.reset();
-          return null;
-      }
-
-      return new SegmentEntry(checksum, entryBytes.rewind());
   }
 
   @Override
@@ -208,54 +128,52 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   @SuppressWarnings("unchecked")
-  <T extends E> Indexed<T> append(T entry) {
-    // Store the entry index.
-    final long index = getNextIndex();
-
-    // Serialize the entry.
-    try {
-      namespace.serialize(entry, memory.clear().position(HEADER_BYTES));
-    } catch (KryoException e) {
-      throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
-    }
-    memory.flip();
+  <T extends E> Indexed<T> append(final T entry) {
+      // Store the entry index.
+      final long index = getNextIndex();
 
-    final int length = memory.limit() - HEADER_BYTES;
+      // Serialize the entry.
+      try {
+          namespace.serialize(entry, buffer.clear().position(HEADER_BYTES));
+      } catch (KryoException e) {
+          throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+      }
+      buffer.flip();
 
-    // Ensure there's enough space left in the buffer to store the entry.
-    if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
-      throw new BufferOverflowException();
-    }
+      final int length = buffer.limit() - HEADER_BYTES;
+      // Ensure there's enough space left in the buffer to store the entry.
+      if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
+          throw new BufferOverflowException();
+      }
 
-    // If the entry length exceeds the maximum entry size then throw an exception.
-    if (length > maxEntrySize) {
-      throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
-    }
+      // If the entry length exceeds the maximum entry size then throw an exception.
+      if (length > maxEntrySize) {
+          throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
+      }
 
-    // Compute the checksum for the entry.
-    final CRC32 crc32 = new CRC32();
-    crc32.update(memory.array(), HEADER_BYTES, memory.limit() - HEADER_BYTES);
-    final long checksum = crc32.getValue();
+      // Compute the checksum for the entry.
+      final var crc32 = new CRC32();
+      crc32.update(buffer.slice(HEADER_BYTES, length));
 
-    // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-    memory.putInt(0, length).putInt(Integer.BYTES, (int) checksum);
-    try {
-      channel.write(memory, currentPosition);
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
+      // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
+      buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+      try {
+          channel.write(buffer, currentPosition);
+      } catch (IOException e) {
+          throw new StorageException(e);
+      }
 
-    // Update the last entry with the correct index/term/length.
-    Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
-    this.lastEntry = indexedEntry;
-    this.index.index(index, (int) currentPosition);
+      // Update the last entry with the correct index/term/length.
+      final var indexedEntry = new Indexed<E>(index, entry, length);
+      lastEntry = indexedEntry;
+      this.index.index(index, (int) currentPosition);
 
-    currentPosition = currentPosition + HEADER_BYTES + length;
-    return (Indexed<T>) indexedEntry;
+      currentPosition = currentPosition + HEADER_BYTES + length;
+      return (Indexed<T>) indexedEntry;
   }
 
   @Override
-  void truncate(long index) {
+  void truncate(final long index) {
     // If the index is greater than or equal to the last index, skip the truncate.
     if (index >= getLastIndex()) {
       return;
index 805d23f2f179510e950f45dbd567e20b896e6303..23b32b2e094889332a2ef2152f7cf8e59b022eb0 100644 (file)
@@ -25,6 +25,7 @@ import org.eclipse.jdt.annotation.Nullable;
 
 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
     final @NonNull FileChannel channel;
+    final @NonNull JournalSegment<E> segment;
     final @NonNull JournalIndex index;
     final @NonNull JournalSerdes namespace;
     final int maxSegmentSize;
@@ -34,6 +35,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
     JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
             final JournalIndex index, final JournalSerdes namespace) {
         this.channel = requireNonNull(channel);
+        this.segment = requireNonNull(segment);
         this.index = requireNonNull(index);
         this.namespace = requireNonNull(namespace);
         maxSegmentSize = segment.descriptor().maxSegmentSize();
@@ -43,6 +45,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
 
     JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
         channel = previous.channel;
+        segment = previous.segment;
         index = previous.index;
         namespace = previous.namespace;
         maxSegmentSize = previous.maxSegmentSize;
diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/DiskJournalSegmentWriterTest.java b/atomix-storage/src/test/java/io/atomix/storage/journal/DiskJournalSegmentWriterTest.java
deleted file mode 100644 (file)
index 4d53cdc..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package io.atomix.storage.journal;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SeekableByteChannel;
-import java.util.function.ToIntFunction;
-import org.eclipse.jdt.annotation.Nullable;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-class DiskJournalSegmentWriterTest {
-    private static final int BUFFER_SIZE = 56;
-    private static final int MAX_ENTRY_SIZE = 42;
-
-    @Mock
-    private SeekableByteChannel channel;
-
-    private final ByteBuffer buffer = ByteBuffer.wrap(new byte[BUFFER_SIZE]);
-
-    @Test
-    void testReadFastPath() throws Exception {
-        buffer.putInt(42).putInt(0xE46F28FB).put(new byte[42]).flip();
-
-        final var entry = prepareNextEntry(channel, buffer);
-        assertNotNull(entry);
-        assertEquals(42, entry.bytes().remaining());
-        assertEquals(8, buffer.position());
-        assertEquals(42, buffer.remaining());
-    }
-
-    @Test
-    void testEmptyBufferEndOfFile() throws Exception {
-        buffer.position(BUFFER_SIZE);
-
-        prepareRead(buf -> {
-            assertEquals(0, buf.position());
-            return 0;
-        });
-
-        assertNull(prepareNextEntry(channel, buffer));
-        assertEquals(0, buffer.remaining());
-    }
-
-    @Test
-    void testEmptyBuffer() throws Exception {
-        buffer.position(BUFFER_SIZE);
-
-        prepareRead(buf -> {
-            assertEquals(0, buf.position());
-            buf.putInt(20).putInt(0x0FD59B8D).put(new byte[20]);
-            return 28;
-        });
-
-        final var entry = prepareNextEntry(channel, buffer);
-        assertNotNull(entry);
-        assertEquals(20, entry.bytes().remaining());
-        assertEquals(8, buffer.position());
-        assertEquals(20, buffer.remaining());
-    }
-
-    @Test
-    void testEmptyBufferNotEnough() throws Exception {
-        buffer.position(BUFFER_SIZE);
-
-        prepareRead(buf -> {
-            assertEquals(0, buf.position());
-            buf.putInt(42).putInt(0).put(new byte[20]);
-            return 28;
-        });
-
-        assertNull(prepareNextEntry(channel, buffer));
-        assertEquals(0, buffer.position());
-        assertEquals(28, buffer.remaining());
-    }
-
-    @Test
-    void testHeaderWithNotEnough() throws Exception {
-        buffer.putInt(42).putInt(0).put(new byte[20]).flip();
-
-        prepareRead(buf -> {
-            assertEquals(28, buf.position());
-            return 0;
-        });
-
-        assertNull(prepareNextEntry(channel, buffer));
-        assertEquals(28, buffer.remaining());
-        assertEquals(0, buffer.position());
-    }
-
-    final void prepareRead(final ToIntFunction<ByteBuffer> onRead) throws Exception {
-        doAnswer(invocation -> onRead.applyAsInt(invocation.getArgument(0))).when(channel).read(any());
-    }
-
-    private static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory)
-            throws IOException {
-        return DiskJournalSegmentWriter.prepareNextEntry(channel, memory, MAX_ENTRY_SIZE);
-    }
-}