Eliminate MappableJournalSegmentReader 88/110588/4
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 11 Mar 2024 10:13:05 +0000 (11:13 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 11 Mar 2024 12:51:31 +0000 (13:51 +0100)
MappableJournalSegmentReader is an indirection used by
JournalSegmentReader to get a stable reader and the backend is
manipulated via map()/unmap() operations.

This is sort of a band-aid to lifecycle, as JournalSegment.close()
eagerly closes the writer, which in turn invalidates buffer and all that
jazz.

This dance is quite unnecessary, as JournalSegmentReader always acquires
the Segment, hence as long as it has a reader, the segment is guaranteed
to be mapped -- except for the case of close().

This patch reworks the logic in couple of ways:
- references are now guarding the writer itself, so as long as there are
  some, the writer is not closed.
- createReader() now increments the reference count, which means we can
  directly allocate the appopriate JournalSegmentReader implementation
- JournalSegmentReader.close() now routes to
  JournalSegment.closeReader(), which in turn releases the reference

JIRA: CONTROLLER-2098
Change-Id: I29b926dec5dc1a77e7e5e51ee3e2deb4f0ca6e33
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java [deleted file]
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java

index f67b4cc1b7d2bc2db1d04469a2f5f18015105666..f5e1b83bbf3473a12a9a32d6422bb7c6da6ffcac 100644 (file)
@@ -18,7 +18,6 @@ package io.atomix.storage.journal;
 import io.atomix.storage.journal.index.JournalIndex;
 import io.atomix.storage.journal.index.SparseJournalIndex;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
@@ -42,7 +41,7 @@ final class JournalSegment<E> implements AutoCloseable {
   private final JournalIndex index;
   private final JournalSerdes namespace;
   private final MappableJournalSegmentWriter<E> writer;
-  private final Set<MappableJournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
+  private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
   private final AtomicInteger references = new AtomicInteger();
   private final FileChannel channel;
   private boolean open = true;
@@ -158,8 +157,8 @@ final class JournalSegment<E> implements AutoCloseable {
    * Acquires a reference to the log segment.
    */
   void acquire() {
-    if (references.getAndIncrement() == 0 && open) {
-      map();
+    if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
+      writer.map();
     }
   }
 
@@ -167,28 +166,13 @@ final class JournalSegment<E> implements AutoCloseable {
    * Releases a reference to the log segment.
    */
   void release() {
-    if (references.decrementAndGet() == 0 && open) {
-      unmap();
-    }
-  }
-
-  /**
-   * Maps the log segment into memory.
-   */
-  private void map() {
-    if (storageLevel == StorageLevel.MAPPED) {
-      MappedByteBuffer buffer = writer.map();
-      readers.forEach(reader -> reader.map(buffer));
-    }
-  }
-
-  /**
-   * Unmaps the log segment from memory.
-   */
-  private void unmap() {
-    if (storageLevel == StorageLevel.MAPPED) {
-      writer.unmap();
-      readers.forEach(reader -> reader.unmap());
+    if (references.decrementAndGet() == 0) {
+      if (storageLevel == StorageLevel.MAPPED) {
+        writer.unmap();
+      }
+      if (!open) {
+        finishClose();
+      }
     }
   }
 
@@ -207,14 +191,14 @@ final class JournalSegment<E> implements AutoCloseable {
    *
    * @return A new segment reader.
    */
-  MappableJournalSegmentReader<E> createReader() {
+  JournalSegmentReader<E> createReader() {
     checkOpen();
-    MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(channel, this, maxEntrySize, index,
-        namespace);
-    MappedByteBuffer buffer = writer.buffer();
-    if (buffer != null) {
-      reader.map(buffer);
-    }
+    acquire();
+
+    final var buffer = writer.buffer();
+    final var reader = buffer == null
+      ? new FileChannelJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace)
+        : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace);
     readers.add(reader);
     return reader;
   }
@@ -224,8 +208,10 @@ final class JournalSegment<E> implements AutoCloseable {
    *
    * @param reader the closed segment reader
    */
-  void closeReader(MappableJournalSegmentReader<E> reader) {
-    readers.remove(reader);
+  void closeReader(JournalSegmentReader<E> reader) {
+    if (readers.remove(reader)) {
+      release();
+    }
   }
 
   /**
@@ -249,10 +235,19 @@ final class JournalSegment<E> implements AutoCloseable {
    */
   @Override
   public void close() {
-    unmap();
-    writer.close();
-    readers.forEach(reader -> reader.close());
+    if (!open) {
+      return;
+    }
+
     open = false;
+    readers.forEach(JournalSegmentReader::close);
+    if (references.get() == 0) {
+      finishClose();
+    }
+  }
+
+  private void finishClose() {
+    writer.close();
     try {
       channel.close();
     } catch (IOException e) {
index 512e36a1c57e5c9fe56aacc6fe0997f63e2fcbb6..2dddd9827633cb92b15cd30d1e3be245557c8fb0 100644 (file)
@@ -20,12 +20,14 @@ abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
     private final JournalIndex index;
     final JournalSerdes namespace;
     private final long firstIndex;
+    private final JournalSegment<E> segment;
 
     private Indexed<E> currentEntry;
     private Indexed<E> nextEntry;
 
     JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
             final JournalSerdes namespace) {
+        this.segment = requireNonNull(segment);
         this.maxEntrySize = maxEntrySize;
         this.index = requireNonNull(index);
         this.namespace = requireNonNull(namespace);
@@ -100,7 +102,7 @@ abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
 
     @Override
     public final void close() {
-        // FIXME: CONTROLLER-2098: remove this method
+        segment.closeReader(this);
     }
 
     /**
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java
deleted file mode 100644 (file)
index c70b36b..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.storage.journal;
-
-import io.atomix.storage.journal.index.JournalIndex;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-/**
- * Mappable log segment reader.
- */
-final class MappableJournalSegmentReader<E> implements JournalReader<E> {
-  private final JournalSegment<E> segment;
-  private final FileChannel channel;
-  private final int maxEntrySize;
-  private final JournalIndex index;
-  private final JournalSerdes namespace;
-  private JournalSegmentReader<E> reader;
-
-  MappableJournalSegmentReader(
-      FileChannel channel,
-      JournalSegment<E> segment,
-      int maxEntrySize,
-      JournalIndex index,
-      JournalSerdes namespace) {
-    this.channel = channel;
-    this.segment = segment;
-    this.maxEntrySize = maxEntrySize;
-    this.index = index;
-    this.namespace = namespace;
-    this.reader = new FileChannelJournalSegmentReader<>(channel, segment, maxEntrySize, index, namespace);
-  }
-
-  /**
-   * Converts the reader to a mapped reader using the given buffer.
-   *
-   * @param buffer the mapped buffer
-   */
-  void map(ByteBuffer buffer) {
-    if (!(reader instanceof MappedJournalSegmentReader)) {
-      JournalReader<E> reader = this.reader;
-      this.reader = new MappedJournalSegmentReader<>(buffer, segment, maxEntrySize, index, namespace);
-      this.reader.reset(reader.getNextIndex());
-      reader.close();
-    }
-  }
-
-  /**
-   * Converts the reader to an unmapped reader.
-   */
-  void unmap() {
-    if (reader instanceof MappedJournalSegmentReader) {
-      JournalReader<E> reader = this.reader;
-      this.reader = new FileChannelJournalSegmentReader<>(channel, segment, maxEntrySize, index, namespace);
-      this.reader.reset(reader.getNextIndex());
-      reader.close();
-    }
-  }
-
-  @Override
-  public long getFirstIndex() {
-    return reader.getFirstIndex();
-  }
-
-  @Override
-  public long getCurrentIndex() {
-    return reader.getCurrentIndex();
-  }
-
-  @Override
-  public Indexed<E> getCurrentEntry() {
-    return reader.getCurrentEntry();
-  }
-
-  @Override
-  public long getNextIndex() {
-    return reader.getNextIndex();
-  }
-
-  @Override
-  public boolean hasNext() {
-    return reader.hasNext();
-  }
-
-  @Override
-  public Indexed<E> next() {
-    return reader.next();
-  }
-
-  @Override
-  public void reset() {
-    reader.reset();
-  }
-
-  @Override
-  public void reset(long index) {
-    reader.reset(index);
-  }
-
-  @Override
-  public void close() {
-    reader.close();
-    segment.closeReader(this);
-  }
-}
index d5201b3c26da95c6c503d3a9df3910ebba73338d..f9e015f6867148046dfdf2d333179b9fa716dd04 100644 (file)
@@ -24,14 +24,13 @@ public final class SegmentedJournalReader<E> implements JournalReader<E> {
   private final SegmentedJournal<E> journal;
   private JournalSegment<E> currentSegment;
   private Indexed<E> previousEntry;
-  private MappableJournalSegmentReader<E> currentReader;
+  private JournalSegmentReader<E> currentReader;
   private final Mode mode;
 
   SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
     this.journal = journal;
     this.mode = mode;
     currentSegment = journal.getSegment(index);
-    currentSegment.acquire();
     currentReader = currentSegment.createReader();
 
     long nextIndex = getNextIndex();
@@ -76,10 +75,8 @@ public final class SegmentedJournalReader<E> implements JournalReader<E> {
   public void reset() {
     previousEntry = null;
     currentReader.close();
-    currentSegment.release();
 
     currentSegment = journal.getFirstSegment();
-    currentSegment.acquire();
     currentReader = currentSegment.createReader();
   }
 
@@ -107,10 +104,8 @@ public final class SegmentedJournalReader<E> implements JournalReader<E> {
       JournalSegment<E> segment = journal.getSegment(index - 1);
       if (segment != null) {
         currentReader.close();
-        currentSegment.release();
 
         currentSegment = segment;
-        currentSegment.acquire();
         currentReader = currentSegment.createReader();
       }
     }
@@ -172,10 +167,8 @@ public final class SegmentedJournalReader<E> implements JournalReader<E> {
 
     previousEntry = currentReader.getCurrentEntry();
     currentReader.close();
-    currentSegment.release();
 
     currentSegment = nextSegment;
-    currentSegment.acquire();
     currentReader = currentSegment.createReader();
     return true;
   }