Introduce JournalSegmentWriter 74/110574/4
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 10 Mar 2024 11:22:56 +0000 (12:22 +0100)
committerRobert Varga <nite@hq.sk>
Mon, 11 Mar 2024 12:51:11 +0000 (12:51 +0000)
MappableJournalSegmentWriter needs to flip between the two
implementations with seamless handoff.

Introduce JournalSegmentWriter as common superclass, exposing
currently-used interface.

JIRA: CONTROLLER-2043
JIRA: CONTROLLER-2098
Change-Id: I11f01ea027a757a32e25bf434d29abeca7d40f6e
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/pom.xml
atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java

index 20de4808bd6ae8b2856ea7c3255d319e0820057f..c6f7dc3031834a7fd85faa41a03e57d42cf803bc 100644 (file)
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.eclipse.jdt</groupId>
+      <artifactId>org.eclipse.jdt.annotation</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.esotericsoftware</groupId>
       <artifactId>kryo</artifactId>
index f6dda9c6e0d69d0b015177af27e6c3815bfb4d7c..fa8b02bded1cdabff58370cf66e12aa12a9318da 100644 (file)
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.zip.CRC32;
 
@@ -40,16 +41,11 @@ import java.util.zip.CRC32;
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-class FileChannelJournalSegmentWriter<E> implements JournalWriter<E> {
+final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
 
   private final FileChannel channel;
-  private final JournalSegment<E> segment;
-  private final int maxEntrySize;
-  private final JournalIndex index;
-  private final JournalSerdes namespace;
   private final ByteBuffer memory;
-  private final long firstIndex;
   private Indexed<E> lastEntry;
   private long currentPosition;
 
@@ -59,17 +55,18 @@ class FileChannelJournalSegmentWriter<E> implements JournalWriter<E> {
       int maxEntrySize,
       JournalIndex index,
       JournalSerdes namespace) {
+    super(segment, maxEntrySize, index, namespace);
     this.channel = channel;
-    this.segment = segment;
-    this.maxEntrySize = maxEntrySize;
-    this.index = index;
     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
     memory.limit(0);
-    this.namespace = namespace;
-    this.firstIndex = segment.index();
     reset(0);
   }
 
+  @Override
+  MappedByteBuffer buffer() {
+    return null;
+  }
+
   @Override
   public void reset(long index) {
     long nextIndex = firstIndex;
@@ -217,11 +214,6 @@ class FileChannelJournalSegmentWriter<E> implements JournalWriter<E> {
     return (Indexed<T>) indexedEntry;
   }
 
-  @Override
-  public void commit(long index) {
-
-  }
-
   @Override
   public void truncate(long index) {
     // If the index is greater than or equal to the last index, skip the truncate.
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
new file mode 100644 (file)
index 0000000..bbdf8f2
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import io.atomix.storage.journal.index.JournalIndex;
+import java.nio.MappedByteBuffer;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+
+abstract sealed class JournalSegmentWriter<E> implements JournalWriter<E>
+        permits FileChannelJournalSegmentWriter, MappedJournalSegmentWriter {
+    final @NonNull JournalSegment<E> segment;
+    final int maxEntrySize;
+    final @NonNull JournalIndex index;
+    final @NonNull JournalSerdes namespace;
+    final long firstIndex;
+
+    JournalSegmentWriter(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
+            final JournalSerdes namespace) {
+        this.segment = requireNonNull(segment);
+        this.maxEntrySize = maxEntrySize;
+        this.index = requireNonNull(index);
+        this.namespace = requireNonNull(namespace);
+        this.firstIndex = segment.index();
+    }
+
+    @Override
+    public final void commit(final long index) {
+        // FIXME: CONTROLLER-2098: eliminate the need for this method
+    }
+
+    /**
+     * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
+     * buffer.
+     *
+     * @return the mapped buffer underlying the segment writer, or {@code null}.
+     */
+    abstract @Nullable MappedByteBuffer buffer();
+}
index f19783a9c49901c4619c445fda0d67967485ddda..df577db3d2552979b9695d47389efa722d3596ef 100644 (file)
@@ -23,13 +23,13 @@ import java.nio.channels.FileChannel;
 /**
  * Mappable log segment writer.
  */
-class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
+final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
   private final FileChannel channel;
   private final JournalSegment<E> segment;
   private final int maxEntrySize;
   private final JournalIndex index;
   private final JournalSerdes namespace;
-  private JournalWriter<E> writer;
+  private JournalSegmentWriter<E> writer;
 
   MappableJournalSegmentWriter(
       FileChannel channel,
@@ -56,7 +56,7 @@ class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
     }
 
     try {
-      JournalWriter<E> writer = this.writer;
+      JournalSegmentWriter<E> writer = this.writer;
       MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
       this.writer = new MappedJournalSegmentWriter<>(buffer, segment, maxEntrySize, index, namespace);
       writer.close();
@@ -71,18 +71,14 @@ class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
    */
   void unmap() {
     if (writer instanceof MappedJournalSegmentWriter) {
-      JournalWriter<E> writer = this.writer;
+      JournalSegmentWriter<E> writer = this.writer;
       this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
       writer.close();
     }
   }
 
   MappedByteBuffer buffer() {
-    JournalWriter<E> writer = this.writer;
-    if (writer instanceof MappedJournalSegmentWriter) {
-      return ((MappedJournalSegmentWriter<E>) writer).buffer();
-    }
-    return null;
+    return writer.buffer();
   }
 
   /**
index bc74e8b0ef899f99a17120d34447b1be60f2dd11..e1b1c196d89bd0f4fb8310e49ffba0fb82affa5c 100644 (file)
@@ -15,6 +15,8 @@
  */
 package io.atomix.storage.journal;
 
+import static java.util.Objects.requireNonNull;
+
 import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
 
@@ -24,6 +26,7 @@ import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.util.zip.CRC32;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * Segment writer.
@@ -40,14 +43,10 @@ import java.util.zip.CRC32;
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-class MappedJournalSegmentWriter<E> implements JournalWriter<E> {
-  private final MappedByteBuffer mappedBuffer;
+final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+  private final @NonNull MappedByteBuffer mappedBuffer;
   private final ByteBuffer buffer;
-  private final JournalSegment<E> segment;
-  private final int maxEntrySize;
-  private final JournalIndex index;
-  private final JournalSerdes namespace;
-  private final long firstIndex;
+
   private Indexed<E> lastEntry;
 
   MappedJournalSegmentWriter(
@@ -56,22 +55,14 @@ class MappedJournalSegmentWriter<E> implements JournalWriter<E> {
       int maxEntrySize,
       JournalIndex index,
       JournalSerdes namespace) {
-    this.mappedBuffer = buffer;
+    super(segment, maxEntrySize, index, namespace);
+    this.mappedBuffer = requireNonNull(buffer);
     this.buffer = buffer.slice();
-    this.segment = segment;
-    this.maxEntrySize = maxEntrySize;
-    this.index = index;
-    this.namespace = namespace;
-    this.firstIndex = segment.index();
     reset(0);
   }
 
-  /**
-   * Returns the mapped buffer underlying the segment writer.
-   *
-   * @return the mapped buffer underlying the segment writer
-   */
-  MappedByteBuffer buffer() {
+  @Override
+  @NonNull MappedByteBuffer buffer() {
     return mappedBuffer;
   }
 
@@ -216,10 +207,6 @@ class MappedJournalSegmentWriter<E> implements JournalWriter<E> {
     return (Indexed<T>) indexedEntry;
   }
 
-  @Override
-  public void commit(long index) {
-
-  }
 
   @Override
   public void truncate(long index) {