Separate out FileAccess 34/111634/14
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 6 May 2024 03:23:11 +0000 (05:23 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 9 May 2024 12:36:23 +0000 (12:36 +0000)
Rather than keeping mapping in MappedFileWriter, use MappedFileAccess
to keep its lifecycle.

The second part is keeping the information about last position, which
we encapsulate in JournalSegment.State and its two specializations.

Also move ByteBuffer allocator to DiskFileAccess, as it really is
specific to StorageLevel.DISK.

JIRA: CONTROLLER-2099
Change-Id: I7fba8da21a021a477c965d96045c1b0d4bf8cc29
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileAccess.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/FileAccess.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileAccess.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java

diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileAccess.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileAccess.java
new file mode 100644 (file)
index 0000000..75064b8
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import java.nio.ByteBuffer;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * {@link FileAccess} for {@link StorageLevel#DISK}.
+ */
+@NonNullByDefault
+final class DiskFileAccess extends FileAccess {
+    /**
+     * Just do not bother with IO smaller than this many bytes.
+     */
+    private static final int MIN_IO_SIZE = 8192;
+
+    DiskFileAccess(final JournalSegmentFile file, final int maxEntrySize) {
+        super(file, maxEntrySize);
+    }
+
+    @Override
+    DiskFileReader newFileReader() {
+        return new DiskFileReader(file, allocateBuffer(maxEntrySize, file.maxSize()));
+    }
+
+    @Override
+    DiskFileWriter newFileWriter() {
+        return new DiskFileWriter(file, maxEntrySize, allocateBuffer(maxEntrySize, file.maxSize()));
+    }
+
+    @Override
+    public void close() {
+        // No-op
+    }
+
+    private static ByteBuffer allocateBuffer(final int maxEntrySize, final int maxSegmentSize) {
+        return ByteBuffer.allocate(chooseBufferSize(maxEntrySize, maxSegmentSize));
+    }
+
+    private static int chooseBufferSize(final int maxEntrySize, final int maxSegmentSize) {
+        if (maxSegmentSize <= MIN_IO_SIZE) {
+            // just buffer the entire segment
+            return maxSegmentSize;
+        }
+
+        // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries
+        final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES;
+        return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize;
+    }
+}
index 55f95f4d51a8b933c8c147b4b7c928b537f5e1b4..d5795eb98064f99ed12a747b44b26bf801608f9f 100644 (file)
@@ -32,10 +32,6 @@ final class DiskFileReader extends FileReader {
     // tracks where memory's first available byte maps to in terms of FileChannel.position()
     private int bufferPosition;
 
-    DiskFileReader(final JournalSegmentFile file, final int maxEntrySize) {
-        this(file, file.allocateBuffer(maxEntrySize));
-    }
-
     // Note: take ownership of the buffer
     DiskFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
         super(file);
index 0cc6454c2b314ed85ea8e14af7b30ee415dfeb65..687da045a3e845f40b347151b38bba3bbe248e3e 100644 (file)
 package io.atomix.storage.journal;
 
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
+import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
@@ -32,10 +32,10 @@ final class DiskFileWriter extends FileWriter {
     private final FileChannel channel;
     private final ByteBuffer buffer;
 
-    DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize) {
+    DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize, final ByteBuffer buffer) {
         super(file, maxEntrySize);
+        this.buffer = requireNonNull(buffer);
         channel = file.channel();
-        buffer = file.allocateBuffer(maxEntrySize);
         reader = new DiskFileReader(file, buffer);
     }
 
@@ -44,22 +44,6 @@ final class DiskFileWriter extends FileWriter {
         return reader;
     }
 
-    @Override
-    MappedByteBuffer buffer() {
-        return null;
-    }
-
-    @Override
-    MappedFileWriter toMapped() {
-        flush();
-        return new MappedFileWriter(file, maxEntrySize);
-    }
-
-    @Override
-    DiskFileWriter toDisk() {
-        return null;
-    }
-
     @Override
     void writeEmptyHeader(final int position) {
         try {
@@ -93,9 +77,4 @@ final class DiskFileWriter extends FileWriter {
             }
         }
     }
-
-    @Override
-    void close() {
-        flush();
-    }
 }
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileAccess.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileAccess.java
new file mode 100644 (file)
index 0000000..02c3674
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * Abstract base class for accessing a particular file.
+ */
+@NonNullByDefault
+abstract sealed class FileAccess implements AutoCloseable permits DiskFileAccess, MappedFileAccess {
+    final JournalSegmentFile file;
+    final int maxEntrySize;
+
+    FileAccess(final JournalSegmentFile file, final int maxEntrySize) {
+        this.file = requireNonNull(file);
+        this.maxEntrySize = maxEntrySize;
+    }
+
+    /**
+     * Create a new {@link FileReader}.
+     *
+     * @return a new {@link FileReader}
+     */
+    abstract FileReader newFileReader();
+
+    /**
+     * Create a new {@link FileWriter}.
+     *
+     * @return a new {@link FileWriter}
+     */
+    abstract FileWriter newFileWriter();
+
+    @Override
+    public abstract void close();
+}
index fc9ef64fe3097eb87e194ab4f1e03611ba96de79..262f248ec18dfb2ed5dcca7adc3f45830895809d 100644 (file)
@@ -18,22 +18,29 @@ package io.atomix.storage.journal;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import org.eclipse.jdt.annotation.Nullable;
 
 /**
  * An abstraction over how to write a {@link JournalSegmentFile}.
  */
 abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
-    final JournalSegmentFile file;
-    final int maxEntrySize;
+    private final JournalSegmentFile file;
+    private final int maxEntrySize;
 
     FileWriter(final JournalSegmentFile file, final int maxEntrySize) {
         this.file = requireNonNull(file);
         this.maxEntrySize = maxEntrySize;
     }
 
+    final JournalSegmentFile file() {
+        return file;
+    }
+
+    final int maxEntrySize() {
+        return maxEntrySize;
+    }
+
     /**
      * Return the internal {@link FileReader}.
      *
@@ -63,21 +70,10 @@ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
     /**
      * Flushes written entries to disk.
      */
-    abstract void flush();
-
-    /**
-     * Closes this writer.
-     */
-    abstract void close();
+    abstract void flush() throws IOException;
 
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("path", file.path()).toString();
     }
-
-    abstract @Nullable MappedByteBuffer buffer();
-
-    abstract @Nullable MappedFileWriter toMapped();
-
-    abstract @Nullable DiskFileWriter toDisk();
 }
index 3f7783ba44cda217ee718450b2c3036ddefbb30d..bab668ffd6982a6a3f2ec07bfef3cc5c9b904f35 100644 (file)
@@ -28,6 +28,8 @@ import java.nio.file.Files;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,16 +40,52 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class JournalSegment {
+    /**
+     * Encapsulation of a {@link JournalSegment}'s state;
+     */
+    sealed interface State {
+        // Marker interface
+    }
+
+    /**
+     * Journal segment is active, i.e. there is a associated with it.
+     */
+    @NonNullByDefault
+    record Active(FileAccess access, JournalSegmentWriter writer) implements State {
+        Active {
+            requireNonNull(access);
+            requireNonNull(writer);
+        }
+
+        Inactive deactivate() {
+            final var inactive = new Inactive(writer.currentPosition());
+            access.close();
+            return inactive;
+        }
+    }
+
+    /**
+     * Journal segment is inactive, i.e. there is no writer associated with it.
+     */
+    @NonNullByDefault
+    record Inactive(int position) implements State {
+        Active activate(final JournalSegment segment) throws IOException {
+            final var access = segment.file.newAccess(segment.storageLevel, segment.maxEntrySize);
+            return new Active(access, new JournalSegmentWriter(access.newFileWriter(), segment, segment.journalIndex,
+                this));
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
 
     private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
     private final AtomicInteger references = new AtomicInteger();
-    private final JournalSegmentFile file;
-    private final StorageLevel storageLevel;
+    private final @NonNull JournalSegmentFile file;
+    private final @NonNull StorageLevel storageLevel;
+    private final @NonNull JournalIndex journalIndex;
     private final int maxEntrySize;
-    private final JournalIndex journalIndex;
 
-    private JournalSegmentWriter writer;
+    private State state;
     private boolean open = true;
 
     JournalSegment(
@@ -58,18 +96,15 @@ final class JournalSegment {
         this.file = requireNonNull(file);
         this.storageLevel = requireNonNull(storageLevel);
         this.maxEntrySize = maxEntrySize;
-        journalIndex = new SparseJournalIndex(indexDensity);
 
-        final var fileWriter = switch (storageLevel) {
-            case DISK -> new DiskFileWriter(file, maxEntrySize);
-            case MAPPED -> new MappedFileWriter(file, maxEntrySize);
-        };
+        journalIndex = new SparseJournalIndex(indexDensity);
 
-        // Traverse all entries and push them to index -- thus reconstructing both last index and current position
-        writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex,
-            indexEntries(fileWriter, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null))
-            // relinquish mapped memory
-            .toFileChannel();
+        try (var tmpAccess = file.newAccess(storageLevel, maxEntrySize)) {
+            final var fileReader = tmpAccess.newFileReader();
+            state = new Inactive(indexEntries(fileReader, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null));
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
 
     /**
@@ -113,10 +148,19 @@ final class JournalSegment {
     /**
      * Acquires a reference to the log segment.
      */
-    private void acquire() {
-        if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
-            writer = writer.toMapped();
+    private Active acquire() {
+        return references.getAndIncrement() == 0 ? activate() : (Active) state;
+    }
+
+    private Active activate() {
+        final Active ret;
+        try {
+            ret = ((Inactive) state).activate(this);
+        } catch (IOException e) {
+            throw new StorageException(e);
         }
+        state = ret;
+        return ret;
     }
 
     /**
@@ -124,9 +168,7 @@ final class JournalSegment {
      */
     private void release() {
         if (references.decrementAndGet() == 0) {
-            if (storageLevel == StorageLevel.MAPPED) {
-                writer = writer.toFileChannel();
-            }
+            state = ((Active) state).deactivate();
             if (!open) {
                 finishClose();
             }
@@ -140,9 +182,7 @@ final class JournalSegment {
      */
     JournalSegmentWriter acquireWriter() {
         checkOpen();
-        acquire();
-
-        return writer;
+        return acquire().writer();
     }
 
     /**
@@ -159,12 +199,8 @@ final class JournalSegment {
      */
     JournalSegmentReader createReader() {
         checkOpen();
-        acquire();
 
-        final var buffer = writer.buffer();
-        final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
-            : new DiskFileReader(file, maxEntrySize);
-        final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
+        final var reader = new JournalSegmentReader(this, acquire().access().newFileReader(), maxEntrySize);
         reader.setPosition(JournalSegmentDescriptor.BYTES);
         readers.add(reader);
         return reader;
@@ -216,7 +252,6 @@ final class JournalSegment {
     }
 
     private void finishClose() {
-        writer.close();
         try {
             file.close();
         } catch (IOException e) {
@@ -246,12 +281,12 @@ final class JournalSegment {
             .toString();
     }
 
-    static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
-            final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
+    static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+            final long maxNextIndex, final @Nullable Position start) {
         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
         final var fileReader = fileWriter.reader();
         try {
-            return indexEntries(fileReader, segment, maxEntrySize, journalIndex, maxNextIndex, start);
+            return indexEntries(fileReader, segment, fileWriter.maxEntrySize(), journalIndex, maxNextIndex, start);
         } finally {
             // Make sure reader does not see anything we've done
             fileReader.invalidateCache();
index 134b5233c05b586e4bb5089465e988fd1307f157..5b5a2da83fab71777d6576260895455f94a1e0c1 100644 (file)
@@ -21,8 +21,6 @@ import com.google.common.base.MoreObjects;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
 import java.nio.file.Path;
@@ -37,10 +35,6 @@ final class JournalSegmentFile {
     private static final char PART_SEPARATOR = '-';
     private static final char EXTENSION_SEPARATOR = '.';
     private static final String EXTENSION = "log";
-    /**
-     * Just do not bother with IO smaller than this many bytes.
-     */
-    private static final int MIN_IO_SIZE = 8192;
 
     private final @NonNull JournalSegmentDescriptor descriptor;
     private final @NonNull Path path;
@@ -130,40 +124,29 @@ final class JournalSegmentFile {
     }
 
     /**
-     * Map the contents of the file into memory.
+     * Access this file using specified {@link StorageLevel} and maximum entry size.
      *
-     * @return A {@link MappedByteBuffer}
+     * @param level a {@link StorageLevel}
+     * @param maxEntrySize maximum size of stored entry
+     * @return A {@link MappedFileAccess}
      * @throws IOException if an I/O error occurs
      */
-    @NonNull MappedByteBuffer map() throws IOException {
-        return channel().map(MapMode.READ_WRITE, 0, maxSize());
+    @NonNull FileAccess newAccess(final StorageLevel level, final int maxEntrySize) throws IOException {
+        return switch (level) {
+            case DISK -> new DiskFileAccess(this, maxEntrySize);
+            case MAPPED -> new MappedFileAccess(this, maxEntrySize, channel().map(MapMode.READ_WRITE, 0, maxSize()));
+        };
     }
 
     void close() throws IOException {
         file.close();
     }
 
-    ByteBuffer allocateBuffer(final int maxEntrySize) {
-        return ByteBuffer.allocate(chooseBufferSize(maxEntrySize));
-    }
-
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString();
     }
 
-    private int chooseBufferSize(final int maxEntrySize) {
-        final int maxSegmentSize = maxSize();
-        if (maxSegmentSize <= MIN_IO_SIZE) {
-            // just buffer the entire segment
-            return maxSegmentSize;
-        }
-
-        // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries
-        final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES;
-        return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize;
-    }
-
     /**
      * Returns a boolean value indicating whether the given file appears to be a parsable segment file.
      *
index 9c3da7a42e9e049089f98d3d9e1e8517fbed7bdf..626361308d29a4ff3a4dfd276948e832ebc46140 100644 (file)
@@ -18,12 +18,12 @@ package io.atomix.storage.journal;
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
+import io.atomix.storage.journal.JournalSegment.Inactive;
 import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
 import io.netty.buffer.Unpooled;
 import java.io.EOFException;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
@@ -35,28 +35,27 @@ final class JournalSegmentWriter {
     private final FileWriter fileWriter;
     final @NonNull JournalSegment segment;
     private final @NonNull JournalIndex journalIndex;
-    final int maxSegmentSize;
-    final int maxEntrySize;
 
     private int currentPosition;
 
-    JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
-            final JournalIndex journalIndex, final int currentPosition) {
+    JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+            final int currentPosition) {
         this.fileWriter = requireNonNull(fileWriter);
         this.segment = requireNonNull(segment);
         this.journalIndex = requireNonNull(journalIndex);
-        this.maxEntrySize = maxEntrySize;
         this.currentPosition = currentPosition;
-        maxSegmentSize = segment.file().maxSize();
     }
 
-    JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
-        segment = previous.segment;
-        journalIndex = previous.journalIndex;
-        maxSegmentSize = previous.maxSegmentSize;
-        maxEntrySize = previous.maxEntrySize;
-        currentPosition = previous.currentPosition;
+    JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+            final Inactive segmentState) {
         this.fileWriter = requireNonNull(fileWriter);
+        this.segment = requireNonNull(segment);
+        this.journalIndex = requireNonNull(journalIndex);
+        currentPosition = segmentState.position();
+    }
+
+    int currentPosition() {
+        return currentPosition;
     }
 
     /**
@@ -84,7 +83,7 @@ final class JournalSegmentWriter {
         // Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
         // way smaller than that.
         final int bodyPosition = position + HEADER_BYTES;
-        final int avail = maxSegmentSize - bodyPosition;
+        final int avail = segment.file().maxSize() - bodyPosition;
         if (avail <= 0) {
             // we do not have enough space for the header and a byte: signal a retry
             LOG.trace("Not enough space for {} at {}", index, position);
@@ -92,6 +91,7 @@ final class JournalSegmentWriter {
         }
 
         // Entry must not exceed maxEntrySize
+        final var maxEntrySize = fileWriter.maxEntrySize();
         final var writeLimit = Math.min(avail, maxEntrySize);
 
         // Allocate entry space
@@ -146,7 +146,7 @@ final class JournalSegmentWriter {
 
         currentPosition = index < segment.firstIndex() ? JournalSegmentDescriptor.BYTES
             // recover position and last written
-            : JournalSegment.indexEntries(fileWriter, segment, maxEntrySize, journalIndex, index, nearest);
+            : JournalSegment.indexEntries(fileWriter, segment, journalIndex, index, nearest);
 
         // Zero the entry header at current channel position.
         fileWriter.writeEmptyHeader(currentPosition);
@@ -156,33 +156,10 @@ final class JournalSegmentWriter {
      * Flushes written entries to disk.
      */
     void flush() {
-        fileWriter.flush();
-    }
-
-    /**
-     * Closes this writer.
-     */
-    void close() {
-        fileWriter.close();
-    }
-
-    /**
-     * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
-     * buffer.
-     *
-     * @return the mapped buffer underlying the segment writer, or {@code null}.
-     */
-    @Nullable MappedByteBuffer buffer() {
-        return fileWriter.buffer();
-    }
-
-    @NonNull JournalSegmentWriter toMapped() {
-        final var newWriter = fileWriter.toMapped();
-        return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
-    }
-
-    @NonNull JournalSegmentWriter toFileChannel() {
-        final var newWriter = fileWriter.toDisk();
-        return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
+        try {
+            fileWriter.flush();
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
 }
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileAccess.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileAccess.java
new file mode 100644 (file)
index 0000000..7759764
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.util.internal.PlatformDependent;
+import java.io.UncheckedIOException;
+import java.nio.MappedByteBuffer;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * {@link FileAccess} for {@link StorageLevel#MAPPED}.
+ */
+@NonNullByDefault
+final class MappedFileAccess extends FileAccess {
+    private final MappedByteBuffer mappedBuffer;
+
+    MappedFileAccess(final JournalSegmentFile file, final int maxEntrySize, final MappedByteBuffer mappedBuffer) {
+        super(file, maxEntrySize);
+        this.mappedBuffer = requireNonNull(mappedBuffer);
+    }
+
+    @Override
+    MappedFileReader newFileReader() {
+        return new MappedFileReader(file, mappedBuffer.slice());
+    }
+
+    @Override
+    MappedFileWriter newFileWriter() {
+        return new MappedFileWriter(file, maxEntrySize, mappedBuffer.slice(), () -> {
+           try {
+               mappedBuffer.force();
+           } catch (UncheckedIOException e) {
+               throw e.getCause();
+           }
+        });
+    }
+
+    @Override
+    public void close() {
+        PlatformDependent.freeDirectBuffer(mappedBuffer);
+    }
+}
index 9d129f89a18df3ab696ab1e954e7b323924f25e4..3eca4173dbf48ac73aa273fe360f974993823783 100644 (file)
@@ -25,7 +25,7 @@ final class MappedFileReader extends FileReader {
 
     MappedFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
         super(file);
-        this.buffer = buffer.slice().asReadOnlyBuffer();
+        this.buffer = buffer.asReadOnlyBuffer();
     }
 
     @Override
index a8877fabf7dfe30abf530da71bc9003f0146aa24..589f4d776c246f1bb2e5a9ed316d8e5572e4386d 100644 (file)
  */
 package io.atomix.storage.journal;
 
-import io.netty.util.internal.PlatformDependent;
+import static java.util.Objects.requireNonNull;
+
+import java.io.Flushable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * A {@link StorageLevel#MAPPED} {@link FileWriter}.
  */
 final class MappedFileWriter extends FileWriter {
-    private final @NonNull MappedByteBuffer mappedBuffer;
     private final MappedFileReader reader;
     private final ByteBuffer buffer;
+    private final Flushable flush;
 
-    MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize) {
+    MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize, final ByteBuffer buffer,
+            final Flushable flush) {
         super(file, maxEntrySize);
-
-        try {
-            mappedBuffer = file.map();
-        } catch (IOException e) {
-            throw new StorageException(e);
-        }
-        buffer = mappedBuffer.slice();
-        reader = new MappedFileReader(file, mappedBuffer);
+        this.buffer = requireNonNull(buffer);
+        this.flush = requireNonNull(flush);
+        reader = new MappedFileReader(file, buffer);
     }
 
     @Override
@@ -46,22 +42,6 @@ final class MappedFileWriter extends FileWriter {
         return reader;
     }
 
-    @Override
-    MappedByteBuffer buffer() {
-        return mappedBuffer;
-    }
-
-    @Override
-    MappedFileWriter toMapped() {
-        return null;
-    }
-
-    @Override
-    DiskFileWriter toDisk() {
-        close();
-        return new DiskFileWriter(file, maxEntrySize);
-    }
-
     @Override
     void writeEmptyHeader(final int position) {
         // Note: we issue a single putLong() instead of two putInt()s.
@@ -79,13 +59,7 @@ final class MappedFileWriter extends FileWriter {
     }
 
     @Override
-    void flush() {
-        mappedBuffer.force();
-    }
-
-    @Override
-    void close() {
-        flush();
-        PlatformDependent.freeDirectBuffer(mappedBuffer);
+    void flush() throws IOException {
+        flush.flush();
     }
 }