Share FileChannel across all JournalSegmentReaders 79/110579/2
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 10 Mar 2024 18:00:39 +0000 (19:00 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 11 Mar 2024 12:51:31 +0000 (13:51 +0100)
Now that neither the writer not reader manipulates FileChannel state, we
can share a single channel. This has the nice effect of lifting its
lifecycle management completely to JournalSegment.

JIRA: CONTROLLER-2096
Change-Id: Ib7653d1494fdb53d1a1c73ad20ab103b29fedaa0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentWriter.java

index 9a75973e7d7a48e21d1bd58942d913a4db791ceb..48f6ead66ce02439ec7d0f6877651811736434f7 100644 (file)
@@ -17,7 +17,6 @@ package io.atomix.storage.journal;
 
 import io.atomix.storage.journal.index.JournalIndex;
 import io.atomix.storage.journal.index.SparseJournalIndex;
-import java.io.File;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
@@ -45,6 +44,7 @@ public class JournalSegment<E> implements AutoCloseable {
   private final MappableJournalSegmentWriter<E> writer;
   private final Set<MappableJournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
   private final AtomicInteger references = new AtomicInteger();
+  private final FileChannel channel;
   private boolean open = true;
 
   public JournalSegment(
@@ -58,17 +58,15 @@ public class JournalSegment<E> implements AutoCloseable {
     this.descriptor = descriptor;
     this.storageLevel = storageLevel;
     this.maxEntrySize = maxEntrySize;
-    this.index = new SparseJournalIndex(indexDensity);
     this.namespace = namespace;
-    this.writer = new MappableJournalSegmentWriter<>(openChannel(file.file()), this, maxEntrySize, index, namespace);
-  }
-
-  private FileChannel openChannel(File file) {
+    index = new SparseJournalIndex(indexDensity);
     try {
-      return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
+      channel = FileChannel.open(file.file().toPath(),
+        StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
     } catch (IOException e) {
       throw new StorageException(e);
     }
+    writer = new MappableJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace);
   }
 
   /**
@@ -113,7 +111,11 @@ public class JournalSegment<E> implements AutoCloseable {
    * @return the size of the segment
    */
   public int size() {
-    return writer.size();
+    try {
+      return (int) channel.size();
+    } catch (IOException e) {
+      throw new StorageException(e);
+    }
   }
 
   /**
@@ -207,8 +209,8 @@ public class JournalSegment<E> implements AutoCloseable {
    */
   MappableJournalSegmentReader<E> createReader() {
     checkOpen();
-    MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(
-        openChannel(file.file()), this, maxEntrySize, index, namespace);
+    MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(channel, this, maxEntrySize, index,
+        namespace);
     MappedByteBuffer buffer = writer.buffer();
     if (buffer != null) {
       reader.map(buffer);
@@ -251,6 +253,11 @@ public class JournalSegment<E> implements AutoCloseable {
     writer.close();
     readers.forEach(reader -> reader.close());
     open = false;
+    try {
+      channel.close();
+    } catch (IOException e) {
+      throw new StorageException(e);
+    }
   }
 
   /**
index b0d1b5e3ebe40ee755c6415b8ad27f2a39091a36..c70b36bb35977c6a8653195f5105167aeaf7c294 100644 (file)
@@ -16,7 +16,6 @@
 package io.atomix.storage.journal;
 
 import io.atomix.storage.journal.index.JournalIndex;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
@@ -114,12 +113,6 @@ final class MappableJournalSegmentReader<E> implements JournalReader<E> {
   @Override
   public void close() {
     reader.close();
-    try {
-      channel.close();
-    } catch (IOException e) {
-      throw new StorageException(e);
-    } finally {
-      segment.closeReader(this);
-    }
+    segment.closeReader(this);
   }
 }
index 10784dc49ee38427318b0c43f2852285a7369252..014ff63a6ac30f32ca1f8c48aa7cb0fb328ca056 100644 (file)
@@ -16,7 +16,6 @@
 package io.atomix.storage.journal;
 
 import io.atomix.storage.journal.index.JournalIndex;
-import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
@@ -24,7 +23,6 @@ import java.nio.channels.FileChannel;
  * Mappable log segment writer.
  */
 final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
-  private final FileChannel channel;
   private final JournalSegment<E> segment;
   private JournalSegmentWriter<E> writer;
 
@@ -34,7 +32,6 @@ final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
       int maxEntrySize,
       JournalIndex index,
       JournalSerdes namespace) {
-    this.channel = channel;
     this.segment = segment;
     this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
   }
@@ -70,19 +67,6 @@ final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
     return segment.index();
   }
 
-  /**
-   * Returns the size of the segment.
-   *
-   * @return the size of the segment
-   */
-  public int size() {
-    try {
-      return (int) channel.size();
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
-  }
-
   @Override
   public long getLastIndex() {
     return writer.getLastIndex();
@@ -131,10 +115,5 @@ final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
   @Override
   public void close() {
     writer.close();
-    try {
-      channel.close();
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
   }
 }