Refactor JournalSegmentReader 00/111000/6
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 23 Mar 2024 21:00:34 +0000 (22:00 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 25 Mar 2024 07:45:56 +0000 (08:45 +0100)
We have almost-duplicated code in {Disk,Mapped}JournalSegmentReader,
which looks very similar to what DiskJournalSegmentWriter revolves
around.

JournalSegmentReader revolves around tracking current position and
interpreting bytes read. Its specializations provide the means for
access.

JIRA: CONTROLLER-2109
Change-Id: I80b78cde5baf6b222d888e635dd5c854331f261a
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java

index bf02a90fe0d4b248177d1c5f35d28dac754f32de..766a7e7d65e5994514aec241c13e3560e8428859 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.zip.CRC32;
 
 /**
  * Log segment reader.
@@ -28,74 +29,71 @@ import java.util.zip.CRC32;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
-  private final FileChannel channel;
-  private final ByteBuffer memory;
-  private long currentPosition;
-
-  DiskJournalSegmentReader(
-      FileChannel channel,
-      JournalSegment<E> segment,
-      int maxEntrySize,
-      JournalSerdes namespace) {
-    super(segment, maxEntrySize, namespace);
-    this.channel = channel;
-    this.memory = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2);
-  }
+    private final FileChannel channel;
+    private final ByteBuffer buffer;
 
-  @Override
-  void setPosition(int position) {
-    currentPosition = position;
-    memory.clear().flip();
-  }
+    // tracks where memory's first available byte maps to in terms of FileChannel.position()
+    private int bufferPosition;
 
-  @Override
-  Indexed<E> readEntry(final long index) {
-    try {
-      // Read more bytes from the segment if necessary.
-      if (memory.remaining() < maxEntrySize) {
-        long position = currentPosition + memory.position();
-        channel.read(memory.clear(), position);
-        currentPosition = position;
-        memory.flip();
-      }
+    DiskJournalSegmentReader(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+            final JournalSerdes namespace) {
+        super(segment, maxEntrySize, namespace);
+        this.channel = requireNonNull(channel);
+        buffer = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2).flip();
+        bufferPosition = 0;
+    }
 
-      // Mark the buffer so it can be reset if necessary.
-      memory.mark();
+    @Override void invalidateCache() {
+        buffer.clear().flip();
+        bufferPosition = 0;
+    }
 
-      try {
-        // Read the length of the entry.
-        final int length = memory.getInt();
+    @Override ByteBuffer read(final int position, final int size) {
+        // calculate logical seek distance between buffer's first byte and position and split flow between
+        // forward-moving and backwards-moving code paths.
+        final int seek = bufferPosition - position;
+        return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
+    }
 
-        // If the buffer length is zero then return.
-        if (length <= 0 || length > maxEntrySize) {
-          memory.reset().limit(memory.position());
-          return null;
+    private ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
+        final int missing = buffer.limit() - seek - size;
+        if (missing <= 0) {
+            // fast path: we have the requested region
+            return buffer.slice(seek, size).asReadOnlyBuffer();
         }
 
-        // Read the checksum of the entry.
-        long checksum = memory.getInt() & 0xFFFFFFFFL;
+        // We need to read more data, but let's salvage what we can:
+        // - set buffer position to seek, which means it points to the same as position
+        // - run compact, which moves everything between position and limit onto the beginning of buffer and
+        //   sets it up to receive more bytes
+        // - start the read accounting for the seek
+        buffer.position(seek).compact();
+        readAtLeast(position + seek, missing);
+        return setAndSlice(position, size);
+    }
 
-        // Compute the checksum for the entry bytes.
-        final CRC32 crc32 = new CRC32();
-        crc32.update(memory.array(), memory.position(), length);
+    ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
+        // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
+        //       do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
+        //       read it.
+        buffer.clear();
+        readAtLeast(position, size);
+        return setAndSlice(position, size);
+    }
 
-        // If the stored checksum equals the computed checksum, return the entry.
-        if (checksum == crc32.getValue()) {
-          int limit = memory.limit();
-          memory.limit(memory.position() + length);
-          E entry = namespace.deserialize(memory);
-          memory.limit(limit);
-          return new Indexed<>(index, entry, length);
-        } else {
-          memory.reset().limit(memory.position());
-          return null;
+    void readAtLeast(final int readPosition, final int readAtLeast) {
+        final int bytesRead;
+        try {
+            bytesRead = channel.read(buffer, readPosition);
+        } catch (IOException e) {
+            throw new StorageException(e);
         }
-      } catch (BufferUnderflowException e) {
-        memory.reset().limit(memory.position());
-        return null;
-      }
-    } catch (IOException e) {
-      throw new StorageException(e);
+        verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast);
+        buffer.flip();
+    }
+
+    private ByteBuffer setAndSlice(final int position, final int size) {
+        bufferPosition = position;
+        return buffer.slice(0, size).asReadOnlyBuffer();
     }
-  }
 }
index 44c990f5a3b27884ef7e224211a418bbda3eb05f..af07a1e9534f6223b6b3b1e9fef2a674012ad4af 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import static com.google.common.base.Verify.verify;
 import static java.util.Objects.requireNonNull;
 
+import com.esotericsoftware.kryo.KryoException;
+import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader, MappedJournalSegmentReader {
-    final int maxEntrySize;
-    final JournalSerdes namespace;
+    private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
+
     private final JournalSegment<E> segment;
+    private final JournalSerdes namespace;
+    private final int maxSegmentSize;
+    private final int maxEntrySize;
+
+    private int position;
 
     JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalSerdes namespace) {
         this.segment = requireNonNull(segment);
+        maxSegmentSize = segment.descriptor().maxSegmentSize();
         this.maxEntrySize = maxEntrySize;
         this.namespace = requireNonNull(namespace);
     }
 
     /**
-     * Close this reader.
+     * Return the current position.
+     *
+     * @return current position.
      */
-    final void close() {
-        segment.closeReader(this);
+    final int position() {
+        return position;
     }
 
     /**
@@ -42,13 +56,91 @@ abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader,
      *
      * @param position new position
      */
-    abstract void setPosition(int position);
+    final void setPosition(final int position) {
+        verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
+            "Invalid position %s", position);
+        this.position = position;
+        invalidateCache();
+    }
 
     /**
-     * Reads the entry at specified index.
+     * Invalidate any cache that is present, so that the next read is coherent with the backing file.
+     */
+    abstract void invalidateCache();
+
+    /**
+     * Reads the next entry, assigning it specified index.
      *
      * @param index entry index
      * @return The entry, or {@code null}
      */
-    abstract @Nullable Indexed<E> readEntry(long index);
+    final @Nullable Indexed<E> readEntry(final long index) {
+        // Check if there is enough in the buffer remaining
+        final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
+        if (remaining < 0) {
+            // Not enough space in the segment, there can never be another entry
+            return null;
+        }
+
+        // Calculate maximum entry length not exceeding file size nor maxEntrySize
+        final var maxLength = Math.min(remaining, maxEntrySize);
+        final var buffer = read(position, maxLength + SegmentEntry.HEADER_BYTES);
+
+        // Read the entry length
+        final var length = buffer.getInt(0);
+        if (length < 1 || length > maxLength) {
+            // Invalid length, make sure next read re-tries
+            invalidateCache();
+            return null;
+        }
+
+        // Read the entry checksum
+        final int checksum = buffer.getInt(Integer.BYTES);
+
+        // Slice off the entry's bytes
+        final var entryBytes = buffer.slice(SegmentEntry.HEADER_BYTES, length);
+        // Compute the checksum for the entry bytes.
+        final var crc32 = new CRC32();
+        crc32.update(entryBytes);
+
+        // If the stored checksum does not equal the computed checksum, do not proceed further
+        final var computed = (int) crc32.getValue();
+        if (checksum != computed) {
+            LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
+            invalidateCache();
+            return null;
+        }
+
+        // Attempt to deserialize
+        final E entry;
+        try {
+            entry = namespace.deserialize(entryBytes.rewind());
+        } catch (KryoException e) {
+            // TODO: promote this to a hard error, as it should never happen
+            LOG.debug("Failed to deserialize entry", e);
+            invalidateCache();
+            return null;
+        }
+
+        // We are all set. Update the position.
+        position = position + SegmentEntry.HEADER_BYTES + length;
+        return new Indexed<>(index, entry, length);
+    }
+
+    /**
+     * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed
+     * {@link #maxSegmentSize}.
+     *
+     * @param position position to the entry header
+     * @param size to read, guaranteed to not exceed {@link #maxEntrySize}
+     * @return resulting buffer
+     */
+    abstract ByteBuffer read(int position, int size);
+
+    /**
+     * Close this reader.
+     */
+    final void close() {
+        segment.closeReader(this);
+    }
 }
index bbf100740a50d25af843dad80250c153f6013163..e9059723588893fe9a6a81b709aa2198fc567605 100644 (file)
@@ -16,9 +16,7 @@
  */
 package io.atomix.storage.journal;
 
-import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.util.zip.CRC32;
 
 /**
  * Log segment reader.
@@ -26,59 +24,21 @@ import java.util.zip.CRC32;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
-  private final ByteBuffer buffer;
+    private final ByteBuffer buffer;
 
-  MappedJournalSegmentReader(
-      ByteBuffer buffer,
-      JournalSegment<E> segment,
-      int maxEntrySize,
-      JournalSerdes namespace) {
-    super(segment, maxEntrySize, namespace);
-    this.buffer = buffer.slice();
-  }
-
-  @Override
-  void setPosition(int position) {
-    buffer.position(position);
-  }
-
-  @Override
-  Indexed<E> readEntry(final long index) {
-    // Mark the buffer so it can be reset if necessary.
-    buffer.mark();
-
-    try {
-      // Read the length of the entry.
-      final int length = buffer.getInt();
-
-      // If the buffer length is zero then return.
-      if (length <= 0 || length > maxEntrySize) {
-        buffer.reset();
-        return null;
-      }
-
-      // Read the checksum of the entry.
-      long checksum = buffer.getInt() & 0xFFFFFFFFL;
+    MappedJournalSegmentReader(final ByteBuffer buffer, final JournalSegment<E> segment, final int maxEntrySize,
+            final JournalSerdes namespace) {
+        super(segment, maxEntrySize, namespace);
+        this.buffer = buffer.slice().asReadOnlyBuffer();
+    }
 
-      // Compute the checksum for the entry bytes.
-      final CRC32 crc32 = new CRC32();
-      ByteBuffer slice = buffer.slice();
-      slice.limit(length);
-      crc32.update(slice);
+    @Override
+    void invalidateCache() {
+        // No-op: the mapping is guaranteed to be coherent
+    }
 
-      // If the stored checksum equals the computed checksum, return the entry.
-      if (checksum == crc32.getValue()) {
-        slice.rewind();
-        E entry = namespace.deserialize(slice);
-        buffer.position(buffer.position() + length);
-        return new Indexed<>(index, entry, length);
-      } else {
-        buffer.reset();
-        return null;
-      }
-    } catch (BufferUnderflowException e) {
-      buffer.reset();
-      return null;
+    @Override
+    ByteBuffer read(final int position, final int size) {
+        return buffer.slice(position, size);
     }
-  }
 }