Eliminate MappableJournalSegmentReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
index f67b4cc1b7d2bc2db1d04469a2f5f18015105666..f5e1b83bbf3473a12a9a32d6422bb7c6da6ffcac 100644 (file)
@@ -18,7 +18,6 @@ package io.atomix.storage.journal;
 import io.atomix.storage.journal.index.JournalIndex;
 import io.atomix.storage.journal.index.SparseJournalIndex;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
@@ -42,7 +41,7 @@ final class JournalSegment<E> implements AutoCloseable {
   private final JournalIndex index;
   private final JournalSerdes namespace;
   private final MappableJournalSegmentWriter<E> writer;
-  private final Set<MappableJournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
+  private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
   private final AtomicInteger references = new AtomicInteger();
   private final FileChannel channel;
   private boolean open = true;
@@ -158,8 +157,8 @@ final class JournalSegment<E> implements AutoCloseable {
    * Acquires a reference to the log segment.
    */
   void acquire() {
-    if (references.getAndIncrement() == 0 && open) {
-      map();
+    if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
+      writer.map();
     }
   }
 
@@ -167,28 +166,13 @@ final class JournalSegment<E> implements AutoCloseable {
    * Releases a reference to the log segment.
    */
   void release() {
-    if (references.decrementAndGet() == 0 && open) {
-      unmap();
-    }
-  }
-
-  /**
-   * Maps the log segment into memory.
-   */
-  private void map() {
-    if (storageLevel == StorageLevel.MAPPED) {
-      MappedByteBuffer buffer = writer.map();
-      readers.forEach(reader -> reader.map(buffer));
-    }
-  }
-
-  /**
-   * Unmaps the log segment from memory.
-   */
-  private void unmap() {
-    if (storageLevel == StorageLevel.MAPPED) {
-      writer.unmap();
-      readers.forEach(reader -> reader.unmap());
+    if (references.decrementAndGet() == 0) {
+      if (storageLevel == StorageLevel.MAPPED) {
+        writer.unmap();
+      }
+      if (!open) {
+        finishClose();
+      }
     }
   }
 
@@ -207,14 +191,14 @@ final class JournalSegment<E> implements AutoCloseable {
    *
    * @return A new segment reader.
    */
-  MappableJournalSegmentReader<E> createReader() {
+  JournalSegmentReader<E> createReader() {
     checkOpen();
-    MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(channel, this, maxEntrySize, index,
-        namespace);
-    MappedByteBuffer buffer = writer.buffer();
-    if (buffer != null) {
-      reader.map(buffer);
-    }
+    acquire();
+
+    final var buffer = writer.buffer();
+    final var reader = buffer == null
+      ? new FileChannelJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace)
+        : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace);
     readers.add(reader);
     return reader;
   }
@@ -224,8 +208,10 @@ final class JournalSegment<E> implements AutoCloseable {
    *
    * @param reader the closed segment reader
    */
-  void closeReader(MappableJournalSegmentReader<E> reader) {
-    readers.remove(reader);
+  void closeReader(JournalSegmentReader<E> reader) {
+    if (readers.remove(reader)) {
+      release();
+    }
   }
 
   /**
@@ -249,10 +235,19 @@ final class JournalSegment<E> implements AutoCloseable {
    */
   @Override
   public void close() {
-    unmap();
-    writer.close();
-    readers.forEach(reader -> reader.close());
+    if (!open) {
+      return;
+    }
+
     open = false;
+    readers.forEach(JournalSegmentReader::close);
+    if (references.get() == 0) {
+      finishClose();
+    }
+  }
+
+  private void finishClose() {
+    writer.close();
     try {
       channel.close();
     } catch (IOException e) {