Retain RandomAccessFile in JournalSegmentFile 27/111627/4
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 5 May 2024 21:24:58 +0000 (23:24 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 5 May 2024 22:49:02 +0000 (00:49 +0200)
Having a RandomAccessFile is nice, as it internally holds a FileChannel.
This makes our files stateful -- which is okay, as we still manage their
lifecycle via JournalSegment.

JIRA: CONTROLLER-2099
Change-Id: Id8305c74dbd881eaf52d84191c11bb4ea2bc164b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java

index 697604e60069d962f2b567877e01f409745a493f..a934a903085c28f76a06f504610ec0ee1f63be6d 100644 (file)
@@ -16,7 +16,6 @@
 package io.atomix.storage.journal;
 
 import static com.google.common.base.Verify.verify;
-import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -38,14 +37,14 @@ final class DiskFileReader extends FileReader {
     // tracks where memory's first available byte maps to in terms of FileChannel.position()
     private int bufferPosition;
 
-    DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
-        this(file, channel, allocateBuffer(file.maxSize(), maxEntrySize));
+    DiskFileReader(final JournalSegmentFile file, final int maxEntrySize) {
+        this(file, allocateBuffer(file.maxSize(), maxEntrySize));
     }
 
     // Note: take ownership of the buffer
-    DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final ByteBuffer buffer) {
+    DiskFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
         super(file);
-        this.channel = requireNonNull(channel);
+        channel = file.channel();
         this.buffer = buffer.flip();
         bufferPosition = 0;
     }
index ffa11e819b8673eb5ac0223a23abd4124d0e8522..74fb2d8be84b152899be4fdb518ec3389ec36229 100644 (file)
@@ -29,12 +29,14 @@ final class DiskFileWriter extends FileWriter {
     private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
 
     private final DiskFileReader reader;
+    private final FileChannel channel;
     private final ByteBuffer buffer;
 
-    DiskFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
-        super(file, channel, maxEntrySize);
+    DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize) {
+        super(file, maxEntrySize);
+        channel = file.channel();
         buffer = DiskFileReader.allocateBuffer(file.maxSize(), maxEntrySize);
-        reader = new DiskFileReader(file, channel, buffer);
+        reader = new DiskFileReader(file, buffer);
     }
 
     @Override
@@ -50,7 +52,7 @@ final class DiskFileWriter extends FileWriter {
     @Override
     MappedFileWriter toMapped() {
         flush();
-        return new MappedFileWriter(file, channel, maxEntrySize);
+        return new MappedFileWriter(file, maxEntrySize);
     }
 
     @Override
index 0a9bb3ef11f891cd92f4a84f0f12a21324fccbcf..e9f06a15fc988f482d7e3ba840782fcdbb43ad02 100644 (file)
@@ -25,7 +25,7 @@ import org.eclipse.jdt.annotation.NonNull;
  * An abstraction over how to read a {@link JournalSegmentFile}.
  */
 abstract sealed class FileReader permits DiskFileReader, MappedFileReader {
-    private final JournalSegmentFile file;
+    private final @NonNull JournalSegmentFile file;
 
     FileReader(final JournalSegmentFile file) {
         this.file = requireNonNull(file);
index 3e566fe90b5db192159fbab0ddb8152cdd7e5cf9..a677fc3d26e75d2e0bbfbd7d793ca3dab77dc929 100644 (file)
@@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull;
 import com.google.common.base.MoreObjects;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
 import org.eclipse.jdt.annotation.Nullable;
 
 /**
@@ -28,12 +27,10 @@ import org.eclipse.jdt.annotation.Nullable;
  */
 abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
     final JournalSegmentFile file;
-    final FileChannel channel;
     final int maxEntrySize;
 
-    FileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+    FileWriter(final JournalSegmentFile file, final int maxEntrySize) {
         this.file = requireNonNull(file);
-        this.channel = requireNonNull(channel);
         this.maxEntrySize = maxEntrySize;
     }
 
index a7f4c5aadc5c2845723ec362f60412b47b373c83..b73d942c036d7c12d760fe23c31c91a36e7f0e8e 100644 (file)
@@ -23,9 +23,7 @@ import io.atomix.storage.journal.index.JournalIndex;
 import io.atomix.storage.journal.index.Position;
 import io.atomix.storage.journal.index.SparseJournalIndex;
 import java.io.IOException;
-import java.nio.channels.FileChannel;
 import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -47,7 +45,6 @@ final class JournalSegment {
   private final JournalIndex journalIndex;
   private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
   private final AtomicInteger references = new AtomicInteger();
-  private final FileChannel channel;
 
   private JournalSegmentWriter writer;
   private boolean open = true;
@@ -61,16 +58,10 @@ final class JournalSegment {
     this.storageLevel = requireNonNull(storageLevel);
     this.maxEntrySize = maxEntrySize;
     journalIndex = new SparseJournalIndex(indexDensity);
-    try {
-      channel = FileChannel.open(file.path(),
-        StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
 
     final var fileWriter = switch (storageLevel) {
-        case DISK -> new DiskFileWriter(file, channel, maxEntrySize);
-        case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize);
+        case DISK -> new DiskFileWriter(file, maxEntrySize);
+        case MAPPED -> new MappedFileWriter(file, maxEntrySize);
     };
     writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
         // relinquish mapped memory
@@ -95,19 +86,6 @@ final class JournalSegment {
     return writer.getLastIndex();
   }
 
-  /**
-   * Returns the size of the segment.
-   *
-   * @return the size of the segment
-   */
-  int size() {
-    try {
-      return (int) channel.size();
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
-  }
-
   /**
    * Returns the segment file.
    *
@@ -179,8 +157,7 @@ final class JournalSegment {
     acquire();
 
     final var buffer = writer.buffer();
-    final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
-        : new DiskFileReader(file, channel, maxEntrySize);
+    final var fileReader = buffer != null ? new MappedFileReader(file, buffer) : new DiskFileReader(file, maxEntrySize);
     final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
     reader.setPosition(JournalSegmentDescriptor.BYTES);
     readers.add(reader);
@@ -235,7 +212,7 @@ final class JournalSegment {
   private void finishClose() {
     writer.close();
     try {
-      channel.close();
+      file.close();
     } catch (IOException e) {
       throw new StorageException(e);
     }
index 24652f003b99f4d15e7effec033dfe8134e1d7d4..97dbab72ef3c5ac45c9a0cc3d144ad8b64d91ddf 100644 (file)
@@ -18,9 +18,7 @@ package io.atomix.storage.journal;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
+import java.nio.channels.ReadableByteChannel;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 
@@ -69,23 +67,20 @@ public record JournalSegmentDescriptor(
     static final int VERSION = 1;
 
     /**
-     * Read a JournalSegmentDescriptor from a {@link Path}.
+     * Read a JournalSegmentDescriptor from a {@link ReadableByteChannel}.
      *
-     * @param path path to read from
+     * @param channel channel to read from
      * @return A {@link JournalSegmentDescriptor}
      * @throws IOException if an I/O error occurs or there is not enough data
      */
-    public static @NonNull JournalSegmentDescriptor readFrom(final Path path) throws IOException {
-        final byte[] bytes;
-        try (var is = Files.newInputStream(path, StandardOpenOption.READ)) {
-            bytes = is.readNBytes(BYTES);
+    public static @NonNull JournalSegmentDescriptor readFrom(final ReadableByteChannel channel) throws IOException {
+        final var buffer = ByteBuffer.allocate(BYTES);
+        final var read = channel.read(buffer);
+        if (read != BYTES) {
+            throw new IOException("Need " + BYTES + " bytes, only " + read + " available");
         }
 
-        if (bytes.length != BYTES) {
-            throw new IOException("Need " + BYTES + " bytes, only " + bytes.length + " available");
-        }
-
-        final var buffer = ByteBuffer.wrap(bytes);
+        buffer.flip();
         return new JournalSegmentDescriptor(
             buffer.getInt(),
             buffer.getLong(),
index a7ab481dbb155a67a4903336701794e77a056257..825baa27a2ca47bfb05374e964d1c3817ce32d64 100644 (file)
@@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import org.eclipse.jdt.annotation.NonNull;
 
@@ -37,25 +38,40 @@ final class JournalSegmentFile {
     private final @NonNull JournalSegmentDescriptor descriptor;
     private final @NonNull Path path;
 
-    private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor) {
+    private final RandomAccessFile file;
+
+    private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor,
+            final RandomAccessFile file) {
         this.path = requireNonNull(path);
         this.descriptor = requireNonNull(descriptor);
+        this.file = requireNonNull(file);
     }
 
     static @NonNull JournalSegmentFile createNew(final String name, final File directory,
             final JournalSegmentDescriptor descriptor) throws IOException {
         final var file = createSegmentFile(name, directory, descriptor.id());
-        try (var raf = new RandomAccessFile(file, "rw")) {
+        final var raf = new RandomAccessFile(file, "rw");
+        try {
             raf.setLength(descriptor.maxSegmentSize());
             raf.write(descriptor.toArray());
+        } catch (IOException e) {
+            raf.close();
+            throw e;
         }
-        return new JournalSegmentFile(file.toPath(), descriptor);
+        return new JournalSegmentFile(file.toPath(), descriptor, raf);
     }
 
     static @NonNull JournalSegmentFile openExisting(final Path path) throws IOException {
-        // read the descriptor
-        final var descriptor = JournalSegmentDescriptor.readFrom(path);
-        return new JournalSegmentFile(path, descriptor);
+        final var raf = new RandomAccessFile(path.toFile(), "rw");
+        final JournalSegmentDescriptor descriptor;
+        try {
+            // read the descriptor
+            descriptor = JournalSegmentDescriptor.readFrom(raf.getChannel());
+        } catch (IOException e) {
+            raf.close();
+            throw e;
+        }
+        return new JournalSegmentFile(path, descriptor, raf);
     }
 
     /**
@@ -80,6 +96,18 @@ final class JournalSegmentFile {
         return descriptor.maxSegmentSize();
     }
 
+    int size() throws IOException {
+        return (int) file.length();
+    }
+
+    FileChannel channel() {
+        return file.getChannel();
+    }
+
+    void close() throws IOException {
+        file.close();
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString();
index 0849cffd0b5cc5170667cd85b050ccaa8051619e..f91cdc827ae47dd4a27d5dc74989974df600435f 100644 (file)
@@ -30,10 +30,10 @@ final class MappedFileWriter extends FileWriter {
     private final MappedFileReader reader;
     private final ByteBuffer buffer;
 
-    MappedFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
-        super(file, channel, maxEntrySize);
+    MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize) {
+        super(file, maxEntrySize);
 
-        mappedBuffer = mapBuffer(channel, file.maxSize());
+        mappedBuffer = mapBuffer(file.channel(), file.maxSize());
         buffer = mappedBuffer.slice();
         reader = new MappedFileReader(file, mappedBuffer);
     }
@@ -64,7 +64,7 @@ final class MappedFileWriter extends FileWriter {
     @Override
     DiskFileWriter toDisk() {
         close();
-        return new DiskFileWriter(file, channel, maxEntrySize);
+        return new DiskFileWriter(file, maxEntrySize);
     }
 
     @Override
index 7e821277ce5e344fead93016cdee48f4d7f89deb..23a5419b8333befb67031b187d3efad17a35a9a9 100644 (file)
@@ -191,7 +191,13 @@ public final class SegmentedJournal<E> implements Journal<E> {
    */
   public long size() {
     return segments.values().stream()
-        .mapToLong(JournalSegment::size)
+        .mapToLong(segment -> {
+          try {
+            return segment.file().size();
+          } catch (IOException e) {
+            throw new StorageException(e);
+          }
+        })
         .sum();
   }