Move JournalSegmentWriter switchover logic 75/110575/5
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 10 Mar 2024 11:53:21 +0000 (12:53 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 11 Mar 2024 12:51:31 +0000 (13:51 +0100)
This patch moves the logic to JournalSegmentWriter implementations,
which allows us to communicate internal state without the need to
re-establish it or leak it to the outside world.

JIRA: CONTROLLER-2043
Change-Id: Ifcb937fdaeaecd46b53a962c541ebfff689ecd40
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java

index fa8b02bded1cdabff58370cf66e12aa12a9318da..75bd9ac7e47483b062be62ebb81f678971fa9f1f 100644 (file)
@@ -44,7 +44,6 @@ import java.util.zip.CRC32;
 final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
 
-  private final FileChannel channel;
   private final ByteBuffer memory;
   private Indexed<E> lastEntry;
   private long currentPosition;
@@ -55,8 +54,7 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
       int maxEntrySize,
       JournalIndex index,
       JournalSerdes namespace) {
-    super(segment, maxEntrySize, index, namespace);
-    this.channel = channel;
+    super(channel, segment, maxEntrySize, index, namespace);
     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
     memory.limit(0);
     reset(0);
@@ -67,6 +65,16 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     return null;
   }
 
+  @Override
+  MappedJournalSegmentWriter<E> toMapped() {
+    return new MappedJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
+  }
+
+  @Override
+  FileChannelJournalSegmentWriter<E> toFileChannel() {
+    return this;
+  }
+
   @Override
   public void reset(long index) {
     long nextIndex = firstIndex;
index bbdf8f2d0862b7e5376fe751fe46de89c6d1bfef..b3c4fd04988f7ec9c9eaa68c794ad96c88f40b04 100644 (file)
@@ -11,19 +11,22 @@ import static java.util.Objects.requireNonNull;
 
 import io.atomix.storage.journal.index.JournalIndex;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 
 abstract sealed class JournalSegmentWriter<E> implements JournalWriter<E>
         permits FileChannelJournalSegmentWriter, MappedJournalSegmentWriter {
+    final @NonNull FileChannel channel;
     final @NonNull JournalSegment<E> segment;
     final int maxEntrySize;
     final @NonNull JournalIndex index;
     final @NonNull JournalSerdes namespace;
     final long firstIndex;
 
-    JournalSegmentWriter(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
-            final JournalSerdes namespace) {
+    JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+            final JournalIndex index, final JournalSerdes namespace) {
+        this.channel = requireNonNull(channel);
         this.segment = requireNonNull(segment);
         this.maxEntrySize = maxEntrySize;
         this.index = requireNonNull(index);
@@ -43,4 +46,8 @@ abstract sealed class JournalSegmentWriter<E> implements JournalWriter<E>
      * @return the mapped buffer underlying the segment writer, or {@code null}.
      */
     abstract @Nullable MappedByteBuffer buffer();
+
+    abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
+
+    abstract @NonNull FileChannelJournalSegmentWriter<E> toFileChannel();
 }
index df577db3d2552979b9695d47389efa722d3596ef..10784dc49ee38427318b0c43f2852285a7369252 100644 (file)
@@ -26,9 +26,6 @@ import java.nio.channels.FileChannel;
 final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
   private final FileChannel channel;
   private final JournalSegment<E> segment;
-  private final int maxEntrySize;
-  private final JournalIndex index;
-  private final JournalSerdes namespace;
   private JournalSegmentWriter<E> writer;
 
   MappableJournalSegmentWriter(
@@ -39,9 +36,6 @@ final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
       JournalSerdes namespace) {
     this.channel = channel;
     this.segment = segment;
-    this.maxEntrySize = maxEntrySize;
-    this.index = index;
-    this.namespace = namespace;
     this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
   }
 
@@ -51,30 +45,16 @@ final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
    * @return the buffer that was mapped into memory
    */
   MappedByteBuffer map() {
-    if (writer instanceof MappedJournalSegmentWriter) {
-      return ((MappedJournalSegmentWriter<E>) writer).buffer();
-    }
-
-    try {
-      JournalSegmentWriter<E> writer = this.writer;
-      MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
-      this.writer = new MappedJournalSegmentWriter<>(buffer, segment, maxEntrySize, index, namespace);
-      writer.close();
-      return buffer;
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
+    final var mapped = writer.toMapped();
+    writer = mapped;
+    return mapped.buffer();
   }
 
   /**
    * Unmaps the mapped buffer.
    */
   void unmap() {
-    if (writer instanceof MappedJournalSegmentWriter) {
-      JournalSegmentWriter<E> writer = this.writer;
-      this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
-      writer.close();
-    }
+    writer = writer.toFileChannel();
   }
 
   MappedByteBuffer buffer() {
index e1b1c196d89bd0f4fb8310e49ffba0fb82affa5c..f5d4ed9d04a9554de4ec935c6b54b85720158973 100644 (file)
@@ -15,8 +15,6 @@
  */
 package io.atomix.storage.journal;
 
-import static java.util.Objects.requireNonNull;
-
 import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
 
@@ -25,6 +23,7 @@ import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.NonNull;
 
@@ -50,14 +49,18 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   private Indexed<E> lastEntry;
 
   MappedJournalSegmentWriter(
-      MappedByteBuffer buffer,
+      FileChannel channel,
       JournalSegment<E> segment,
       int maxEntrySize,
       JournalIndex index,
       JournalSerdes namespace) {
-    super(segment, maxEntrySize, index, namespace);
-    this.mappedBuffer = requireNonNull(buffer);
-    this.buffer = buffer.slice();
+    super(channel, segment, maxEntrySize, index, namespace);
+    try {
+      mappedBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
+    } catch (IOException e) {
+      throw new StorageException(e);
+    }
+    this.buffer = mappedBuffer.slice();
     reset(0);
   }
 
@@ -66,6 +69,17 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     return mappedBuffer;
   }
 
+  @Override
+  MappedJournalSegmentWriter<E> toMapped() {
+    return this;
+  }
+
+  @Override
+  FileChannelJournalSegmentWriter<E> toFileChannel() {
+    close();
+    return new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
+  }
+
   @Override
   public void reset(long index) {
     long nextIndex = firstIndex;