Factor out FileReader interface 01/111001/5
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 23 Mar 2024 21:52:25 +0000 (22:52 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 25 Mar 2024 09:39:54 +0000 (10:39 +0100)
As it turns out, JournalSegmentReader's specializations are providing a
very simple API they provide via abstract methods.

Extract this API into FileReader, allowing JournalSegmentReader to
become a final class, reducing mental requirements to understand what is
going on.

JIRA: CONTROLLER-2109
Change-Id: I4199f10a9483b18ac381f16e2b0d818f2d74ac16
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java [moved from atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java with 72% similarity]
atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java [moved from atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java with 65% similarity]

similarity index 72%
rename from atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java
rename to atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java
index 766a7e7d65e5994514aec241c13e3560e8428859..15c3d9b5b06ba5c797cc75fd588f20fea97aa341 100644 (file)
@@ -1,6 +1,5 @@
 /*
- * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
- * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,40 +21,45 @@ import static java.util.Objects.requireNonNull;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import org.checkerframework.checker.nullness.qual.NonNull;
 
 /**
- * Log segment reader.
- *
- * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
+ * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
  */
-final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
+final class DiskFileReader extends FileReader {
     private final FileChannel channel;
     private final ByteBuffer buffer;
 
     // tracks where memory's first available byte maps to in terms of FileChannel.position()
     private int bufferPosition;
 
-    DiskJournalSegmentReader(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
-            final JournalSerdes namespace) {
-        super(segment, maxEntrySize, namespace);
+    DiskFileReader(final Path path, final FileChannel channel, final int maxEntrySize) {
+        super(path);
         this.channel = requireNonNull(channel);
-        buffer = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2).flip();
+        buffer = ByteBuffer.allocate(chooseBufferSize(maxEntrySize)).flip();
         bufferPosition = 0;
     }
 
-    @Override void invalidateCache() {
+    private static int chooseBufferSize(final int maxEntrySize) {
+        return (maxEntrySize + SegmentEntry.HEADER_BYTES) * 2;
+    }
+
+    @Override
+    void invalidateCache() {
         buffer.clear().flip();
         bufferPosition = 0;
     }
 
-    @Override ByteBuffer read(final int position, final int size) {
+    @Override
+    ByteBuffer read(final int position, final int size) {
         // calculate logical seek distance between buffer's first byte and position and split flow between
         // forward-moving and backwards-moving code paths.
         final int seek = bufferPosition - position;
         return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
     }
 
-    private ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
+    private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
         final int missing = buffer.limit() - seek - size;
         if (missing <= 0) {
             // fast path: we have the requested region
@@ -72,7 +76,7 @@ final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
         return setAndSlice(position, size);
     }
 
-    ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
+    private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
         // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
         //       do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
         //       read it.
@@ -81,7 +85,7 @@ final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
         return setAndSlice(position, size);
     }
 
-    void readAtLeast(final int readPosition, final int readAtLeast) {
+    private void readAtLeast(final int readPosition, final int readAtLeast) {
         final int bytesRead;
         try {
             bytesRead = channel.read(buffer, readPosition);
@@ -92,7 +96,7 @@ final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
         buffer.flip();
     }
 
-    private ByteBuffer setAndSlice(final int position, final int size) {
+    private @NonNull ByteBuffer setAndSlice(final int position, final int size) {
         bufferPosition = position;
         return buffer.slice(0, size).asReadOnlyBuffer();
     }
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java
new file mode 100644 (file)
index 0000000..fdc0597
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * An abstraction over how to read a {@link JournalSegmentFile}.
+ */
+abstract sealed class FileReader permits DiskFileReader, MappedFileReader {
+    private final Path path;
+
+    FileReader(final Path path) {
+        this.path = requireNonNull(path);
+    }
+
+    /**
+     * Invalidate any cache that is present, so that the next read is coherent with the backing file.
+     */
+    abstract void invalidateCache();
+
+    /**
+     * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed the maximum
+     * segment size nor maximum entry size.
+     *
+     * @param position position to the entry header
+     * @param size to read
+     * @return resulting buffer
+     */
+    abstract @NonNull ByteBuffer read(int position, int size);
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("path", path).toString();
+    }
+}
index d21d9051b6e6c97fc71467cf186df4f48ea0e353..81699a094ac9c0927abe32a5612ee0c5e5a242ab 100644 (file)
@@ -185,9 +185,10 @@ final class JournalSegment<E> implements AutoCloseable {
     acquire();
 
     final var buffer = writer.buffer();
-    final var reader = buffer == null
-      ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace)
-        : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace);
+    final var path = file.file().toPath();
+    final var fileReader = buffer != null ? new MappedFileReader(path, buffer)
+        : new DiskFileReader(path, channel, maxEntrySize);
+    final var reader = new JournalSegmentReader<>(this, fileReader, maxEntrySize, namespace);
     reader.setPosition(JournalSegmentDescriptor.BYTES);
     readers.add(reader);
     return reader;
index af07a1e9534f6223b6b3b1e9fef2a674012ad4af..93ccd1748eb9d35ab3789ccbfce0d6ac41f073b3 100644 (file)
@@ -19,24 +19,26 @@ import static com.google.common.base.Verify.verify;
 import static java.util.Objects.requireNonNull;
 
 import com.esotericsoftware.kryo.KryoException;
-import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader, MappedJournalSegmentReader {
+final class JournalSegmentReader<E> {
     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
 
     private final JournalSegment<E> segment;
     private final JournalSerdes namespace;
+    private final FileReader fileReader;
     private final int maxSegmentSize;
     private final int maxEntrySize;
 
     private int position;
 
-    JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalSerdes namespace) {
+    JournalSegmentReader(final JournalSegment<E> segment, final FileReader fileReader,
+            final int maxEntrySize, final JournalSerdes namespace) {
         this.segment = requireNonNull(segment);
+        this.fileReader = requireNonNull(fileReader);
         maxSegmentSize = segment.descriptor().maxSegmentSize();
         this.maxEntrySize = maxEntrySize;
         this.namespace = requireNonNull(namespace);
@@ -47,7 +49,7 @@ abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader,
      *
      * @return current position.
      */
-    final int position() {
+    int position() {
         return position;
     }
 
@@ -56,17 +58,19 @@ abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader,
      *
      * @param position new position
      */
-    final void setPosition(final int position) {
+    void setPosition(final int position) {
         verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
             "Invalid position %s", position);
         this.position = position;
-        invalidateCache();
+        fileReader.invalidateCache();
     }
 
     /**
      * Invalidate any cache that is present, so that the next read is coherent with the backing file.
      */
-    abstract void invalidateCache();
+    void invalidateCache() {
+        fileReader.invalidateCache();
+    }
 
     /**
      * Reads the next entry, assigning it specified index.
@@ -74,7 +78,7 @@ abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader,
      * @param index entry index
      * @return The entry, or {@code null}
      */
-    final @Nullable Indexed<E> readEntry(final long index) {
+    @Nullable Indexed<E> readEntry(final long index) {
         // Check if there is enough in the buffer remaining
         final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
         if (remaining < 0) {
@@ -84,7 +88,7 @@ abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader,
 
         // Calculate maximum entry length not exceeding file size nor maxEntrySize
         final var maxLength = Math.min(remaining, maxEntrySize);
-        final var buffer = read(position, maxLength + SegmentEntry.HEADER_BYTES);
+        final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
 
         // Read the entry length
         final var length = buffer.getInt(0);
@@ -127,20 +131,10 @@ abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader,
         return new Indexed<>(index, entry, length);
     }
 
-    /**
-     * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed
-     * {@link #maxSegmentSize}.
-     *
-     * @param position position to the entry header
-     * @param size to read, guaranteed to not exceed {@link #maxEntrySize}
-     * @return resulting buffer
-     */
-    abstract ByteBuffer read(int position, int size);
-
     /**
      * Close this reader.
      */
-    final void close() {
+    void close() {
         segment.closeReader(this);
     }
 }
similarity index 65%
rename from atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java
rename to atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java
index e9059723588893fe9a6a81b709aa2198fc567605..204fd7255022a9ab2f0d5d303f7ee380b559addb 100644 (file)
@@ -1,6 +1,5 @@
 /*
- * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
- * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package io.atomix.storage.journal;
 
 import java.nio.ByteBuffer;
+import java.nio.file.Path;
 
 /**
- * Log segment reader.
- *
- * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
+ * A {@link StorageLevel#MAPPED} implementation of {@link FileReader}. Operates on direct mapping of the entire file.
  */
-final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
+final class MappedFileReader extends FileReader {
     private final ByteBuffer buffer;
 
-    MappedJournalSegmentReader(final ByteBuffer buffer, final JournalSegment<E> segment, final int maxEntrySize,
-            final JournalSerdes namespace) {
-        super(segment, maxEntrySize, namespace);
+    MappedFileReader(final Path path, final ByteBuffer buffer) {
+        super(path);
         this.buffer = buffer.slice().asReadOnlyBuffer();
     }