Introduce JournalSegmentReader 77/110577/2
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 10 Mar 2024 16:33:55 +0000 (17:33 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 11 Mar 2024 12:51:31 +0000 (13:51 +0100)
The two implementations for accessing the file have a common API and
potentially share a lot of code. Introduce JournalSegmentReader to act
as a common superclass.

JIRA: CONTROLLER-2098
Change-Id: Ie8b524812f6193206e1df100ac55ed4f11ad9f5c
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java

index 46d7a659568eb0e3b7703adc461cb1d1ed1c9d06..3f8afdf4fa659fd631270d0e483ec0823c32d96c 100644 (file)
@@ -30,13 +30,9 @@ import java.util.zip.Checksum;
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-final class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
+final class FileChannelJournalSegmentReader<E> extends JournalSegmentReader<E> {
   private final FileChannel channel;
-  private final int maxEntrySize;
-  private final JournalIndex index;
-  private final JournalSerdes namespace;
   private final ByteBuffer memory;
-  private final long firstIndex;
   private Indexed<E> currentEntry;
   private Indexed<E> nextEntry;
   private long currentPosition;
@@ -47,20 +43,12 @@ final class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
       int maxEntrySize,
       JournalIndex index,
       JournalSerdes namespace) {
+    super(segment, maxEntrySize, index, namespace);
     this.channel = channel;
-    this.maxEntrySize = maxEntrySize;
-    this.index = index;
-    this.namespace = namespace;
     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
-    this.firstIndex = segment.index();
     reset();
   }
 
-  @Override
-  public long getFirstIndex() {
-    return firstIndex;
-  }
-
   @Override
   public long getCurrentIndex() {
     return currentEntry != null ? currentEntry.index() : 0;
@@ -185,9 +173,4 @@ final class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
       throw new StorageException(e);
     }
   }
-
-  @Override
-  public void close() {
-    // Do nothing. The parent reader manages the channel.
-  }
 }
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
new file mode 100644 (file)
index 0000000..23ef1f3
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import io.atomix.storage.journal.index.JournalIndex;
+
+abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
+        permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
+    final int maxEntrySize;
+    final JournalIndex index;
+    final JournalSerdes namespace;
+    final long firstIndex;
+
+    JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
+            final JournalSerdes namespace) {
+        this.maxEntrySize = maxEntrySize;
+        this.index = requireNonNull(index);
+        this.namespace = requireNonNull(namespace);
+        this.firstIndex = segment.index();
+    }
+
+    @Override
+    public final long getFirstIndex() {
+        return firstIndex;
+    }
+
+    @Override
+    public final void close() {
+        // FIXME: CONTROLLER-2098: remove this method
+    }
+}
index c8aa8fcd401f92de3d7c59f8e918399c0c0591b6..b0d1b5e3ebe40ee755c6415b8ad27f2a39091a36 100644 (file)
@@ -23,13 +23,13 @@ import java.nio.channels.FileChannel;
 /**
  * Mappable log segment reader.
  */
-class MappableJournalSegmentReader<E> implements JournalReader<E> {
+final class MappableJournalSegmentReader<E> implements JournalReader<E> {
   private final JournalSegment<E> segment;
   private final FileChannel channel;
   private final int maxEntrySize;
   private final JournalIndex index;
   private final JournalSerdes namespace;
-  private JournalReader<E> reader;
+  private JournalSegmentReader<E> reader;
 
   MappableJournalSegmentReader(
       FileChannel channel,
index 1a337010821c68cc048b0a1d6bc49fbf549b8017..5e847ce81d6cc446aac1310497d2f787675b75d9 100644 (file)
@@ -27,12 +27,8 @@ import java.util.zip.CRC32;
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-final class MappedJournalSegmentReader<E> implements JournalReader<E> {
+final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
   private final ByteBuffer buffer;
-  private final int maxEntrySize;
-  private final JournalIndex index;
-  private final JournalSerdes namespace;
-  private final long firstIndex;
   private Indexed<E> currentEntry;
   private Indexed<E> nextEntry;
 
@@ -42,19 +38,11 @@ final class MappedJournalSegmentReader<E> implements JournalReader<E> {
       int maxEntrySize,
       JournalIndex index,
       JournalSerdes namespace) {
+    super(segment, maxEntrySize, index, namespace);
     this.buffer = buffer.slice();
-    this.maxEntrySize = maxEntrySize;
-    this.index = index;
-    this.namespace = namespace;
-    this.firstIndex = segment.index();
     reset();
   }
 
-  @Override
-  public long getFirstIndex() {
-    return firstIndex;
-  }
-
   @Override
   public long getCurrentIndex() {
     return currentEntry != null ? currentEntry.index() : 0;
@@ -165,9 +153,4 @@ final class MappedJournalSegmentReader<E> implements JournalReader<E> {
       nextEntry = null;
     }
   }
-
-  @Override
-  public void close() {
-    // Do nothing. The writer is responsible for cleaning the mapped buffer.
-  }
 }