Expand JournalSegmentFile semantics 24/111624/1
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 5 May 2024 20:04:20 +0000 (22:04 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 5 May 2024 20:08:00 +0000 (22:08 +0200)
JournalSegmentFile also includes a descriptor, making it more useful
than just a File/Path holder.

JIRA: CONTROLLER-2099
Change-Id: Ia808512b7c3012bdb8e749429ac4b785643fa6c7
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java

index 311d16b1500201fb39edb77fc0dc67ba4265ce53..697604e60069d962f2b567877e01f409745a493f 100644 (file)
@@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.file.Path;
 import org.eclipse.jdt.annotation.NonNull;
 
 /**
@@ -39,13 +38,13 @@ final class DiskFileReader extends FileReader {
     // tracks where memory's first available byte maps to in terms of FileChannel.position()
     private int bufferPosition;
 
-    DiskFileReader(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
-        this(path, channel, allocateBuffer(maxSegmentSize, maxEntrySize));
+    DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+        this(file, channel, allocateBuffer(file.maxSize(), maxEntrySize));
     }
 
     // Note: take ownership of the buffer
-    DiskFileReader(final Path path, final FileChannel channel, final ByteBuffer buffer) {
-        super(path);
+    DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final ByteBuffer buffer) {
+        super(file);
         this.channel = requireNonNull(channel);
         this.buffer = buffer.flip();
         bufferPosition = 0;
index 5f468d46a123d542003af36a652d12da79dccdad..ffa11e819b8673eb5ac0223a23abd4124d0e8522 100644 (file)
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.file.Path;
 
 /**
  * A {@link StorageLevel#DISK} {@link FileWriter}.
@@ -32,10 +31,10 @@ final class DiskFileWriter extends FileWriter {
     private final DiskFileReader reader;
     private final ByteBuffer buffer;
 
-    DiskFileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
-        super(path, channel, maxSegmentSize, maxEntrySize);
-        buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
-        reader = new DiskFileReader(path, channel, buffer);
+    DiskFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+        super(file, channel, maxEntrySize);
+        buffer = DiskFileReader.allocateBuffer(file.maxSize(), maxEntrySize);
+        reader = new DiskFileReader(file, channel, buffer);
     }
 
     @Override
@@ -51,7 +50,7 @@ final class DiskFileWriter extends FileWriter {
     @Override
     MappedFileWriter toMapped() {
         flush();
-        return new MappedFileWriter(path, channel, maxSegmentSize, maxEntrySize);
+        return new MappedFileWriter(file, channel, maxEntrySize);
     }
 
     @Override
index fdc0597d36189017640bbd49045a03e6726b5f8c..0a9bb3ef11f891cd92f4a84f0f12a21324fccbcf 100644 (file)
@@ -19,17 +19,16 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
 import java.nio.ByteBuffer;
-import java.nio.file.Path;
 import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * An abstraction over how to read a {@link JournalSegmentFile}.
  */
 abstract sealed class FileReader permits DiskFileReader, MappedFileReader {
-    private final Path path;
+    private final JournalSegmentFile file;
 
-    FileReader(final Path path) {
-        this.path = requireNonNull(path);
+    FileReader(final JournalSegmentFile file) {
+        this.file = requireNonNull(file);
     }
 
     /**
@@ -49,6 +48,6 @@ abstract sealed class FileReader permits DiskFileReader, MappedFileReader {
 
     @Override
     public final String toString() {
-        return MoreObjects.toStringHelper(this).add("path", path).toString();
+        return MoreObjects.toStringHelper(this).add("path", file.path()).toString();
     }
 }
index 4ead89bfb3789bb814b4e3ba620049cb95fedb74..3e566fe90b5db192159fbab0ddb8152cdd7e5cf9 100644 (file)
@@ -21,22 +21,19 @@ import com.google.common.base.MoreObjects;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.file.Path;
 import org.eclipse.jdt.annotation.Nullable;
 
 /**
  * An abstraction over how to write a {@link JournalSegmentFile}.
  */
 abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
-    final Path path;
+    final JournalSegmentFile file;
     final FileChannel channel;
-    final int maxSegmentSize;
     final int maxEntrySize;
 
-    FileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
-        this.path = requireNonNull(path);
+    FileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+        this.file = requireNonNull(file);
         this.channel = requireNonNull(channel);
-        this.maxSegmentSize = maxSegmentSize;
         this.maxEntrySize = maxEntrySize;
     }
 
@@ -70,7 +67,7 @@ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
 
     @Override
     public final String toString() {
-        return MoreObjects.toStringHelper(this).add("path", path).toString();
+        return MoreObjects.toStringHelper(this).add("path", file.path()).toString();
     }
 
     abstract @Nullable MappedByteBuffer buffer();
index 02921bed2b0550e2733706251ffe473228d66623..e8954bcc7fb4c4667b9d6d545795b5eee3fa532f 100644 (file)
@@ -16,6 +16,8 @@
  */
 package io.atomix.storage.journal;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.MoreObjects;
 import io.atomix.storage.journal.index.JournalIndex;
 import io.atomix.storage.journal.index.Position;
@@ -36,7 +38,6 @@ import org.eclipse.jdt.annotation.Nullable;
  */
 final class JournalSegment implements AutoCloseable {
   private final JournalSegmentFile file;
-  private final JournalSegmentDescriptor descriptor;
   private final StorageLevel storageLevel;
   private final int maxEntrySize;
   private final JournalIndex journalIndex;
@@ -49,25 +50,23 @@ final class JournalSegment implements AutoCloseable {
 
   JournalSegment(
       final JournalSegmentFile file,
-      final JournalSegmentDescriptor descriptor,
       final StorageLevel storageLevel,
       final int maxEntrySize,
       final double indexDensity) {
-    this.file = file;
-    this.descriptor = descriptor;
-    this.storageLevel = storageLevel;
+    this.file = requireNonNull(file);
+    this.storageLevel = requireNonNull(storageLevel);
     this.maxEntrySize = maxEntrySize;
     journalIndex = new SparseJournalIndex(indexDensity);
     try {
-      channel = FileChannel.open(file.file().toPath(),
+      channel = FileChannel.open(file.path(),
         StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
     } catch (IOException e) {
       throw new StorageException(e);
     }
 
     final var fileWriter = switch (storageLevel) {
-        case DISK -> new DiskFileWriter(file.file().toPath(), channel, descriptor.maxSegmentSize(), maxEntrySize);
-        case MAPPED -> new MappedFileWriter(file.file().toPath(), channel, descriptor.maxSegmentSize(), maxEntrySize);
+        case DISK -> new DiskFileWriter(file, channel, maxEntrySize);
+        case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize);
     };
     writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
         // relinquish mapped memory
@@ -80,7 +79,7 @@ final class JournalSegment implements AutoCloseable {
    * @return The segment's starting index.
    */
   long firstIndex() {
-    return descriptor.index();
+    return file.descriptor().index();
   }
 
   /**
@@ -114,15 +113,6 @@ final class JournalSegment implements AutoCloseable {
     return file;
   }
 
-  /**
-   * Returns the segment descriptor.
-   *
-   * @return The segment descriptor.
-   */
-  JournalSegmentDescriptor descriptor() {
-    return descriptor;
-  }
-
   /**
    * Looks up the position of the given index.
    *
@@ -185,9 +175,8 @@ final class JournalSegment implements AutoCloseable {
     acquire();
 
     final var buffer = writer.buffer();
-    final var path = file.file().toPath();
-    final var fileReader = buffer != null ? new MappedFileReader(path, buffer)
-        : new DiskFileReader(path, channel, descriptor.maxSegmentSize(), maxEntrySize);
+    final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
+        : new DiskFileReader(file, channel, maxEntrySize);
     final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
     reader.setPosition(JournalSegmentDescriptor.BYTES);
     readers.add(reader);
@@ -253,7 +242,7 @@ final class JournalSegment implements AutoCloseable {
    */
   void delete() {
     try {
-      Files.deleteIfExists(file.file().toPath());
+      Files.deleteIfExists(file.path());
     } catch (IOException e) {
       throw new StorageException(e);
     }
@@ -261,10 +250,11 @@ final class JournalSegment implements AutoCloseable {
 
   @Override
   public String toString() {
+    final var descriptor = file.descriptor();
     return MoreObjects.toStringHelper(this)
         .add("id", descriptor.id())
         .add("version", descriptor.version())
-        .add("index", firstIndex())
+        .add("index", descriptor.index())
         .toString();
   }
 }
index 1306c4b79f4e9b670e3a161391c888fa25805ad4..adc99100877c18c9a969b06b278b88b5ec1721bb 100644 (file)
@@ -17,25 +17,27 @@ package io.atomix.storage.journal;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.base.MoreObjects;
 import java.io.File;
+import java.nio.file.Path;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * Segment file utility.
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-public final class JournalSegmentFile {
+final class JournalSegmentFile {
     private static final char PART_SEPARATOR = '-';
     private static final char EXTENSION_SEPARATOR = '.';
     private static final String EXTENSION = "log";
 
-    private final File file;
+    private final @NonNull JournalSegmentDescriptor descriptor;
+    private final @NonNull Path path;
 
-    /**
-     * @throws IllegalArgumentException if {@code file} is not a valid segment file
-     */
-    JournalSegmentFile(final File file) {
-        this.file = file;
+    JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor) {
+        this.path = requireNonNull(path);
+        this.descriptor = requireNonNull(descriptor);
     }
 
     /**
@@ -43,8 +45,26 @@ public final class JournalSegmentFile {
      *
      * @return The segment file.
      */
-    public File file() {
-        return file;
+    @NonNull Path path() {
+        return path;
+    }
+
+    /**
+     * Returns the segment descriptor.
+     *
+     * @return The segment descriptor.
+     */
+    @NonNull JournalSegmentDescriptor descriptor() {
+        return descriptor;
+    }
+
+    int maxSize() {
+        return descriptor.maxSegmentSize();
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString();
     }
 
     /**
index d89c720c67fef07de93cf623581893fe689dc1ae..5258f4323caaf5ac401ec1f1c0c8644e77225265 100644 (file)
@@ -38,7 +38,7 @@ final class JournalSegmentReader {
     JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
         this.segment = requireNonNull(segment);
         this.fileReader = requireNonNull(fileReader);
-        maxSegmentSize = segment.descriptor().maxSegmentSize();
+        maxSegmentSize = segment.file().maxSize();
         this.maxEntrySize = maxEntrySize;
     }
 
index e381bc25a7d167935b403098778a8d617fc33701..317e8fd45b5360b0d84f5a52f91328da8de7354e 100644 (file)
@@ -44,7 +44,7 @@ final class JournalSegmentWriter {
         this.fileWriter = requireNonNull(fileWriter);
         this.segment = requireNonNull(segment);
         this.index = requireNonNull(index);
-        maxSegmentSize = segment.descriptor().maxSegmentSize();
+        maxSegmentSize = segment.file().maxSize();
         this.maxEntrySize = maxEntrySize;
         // adjust lastEntry value
         reset(0);
index 204fd7255022a9ab2f0d5d303f7ee380b559addb..9d129f89a18df3ab696ab1e954e7b323924f25e4 100644 (file)
@@ -16,7 +16,6 @@
 package io.atomix.storage.journal;
 
 import java.nio.ByteBuffer;
-import java.nio.file.Path;
 
 /**
  * A {@link StorageLevel#MAPPED} implementation of {@link FileReader}. Operates on direct mapping of the entire file.
@@ -24,8 +23,8 @@ import java.nio.file.Path;
 final class MappedFileReader extends FileReader {
     private final ByteBuffer buffer;
 
-    MappedFileReader(final Path path, final ByteBuffer buffer) {
-        super(path);
+    MappedFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
+        super(file);
         this.buffer = buffer.slice().asReadOnlyBuffer();
     }
 
index 47f26ba151b6dcb1740f77b83d3a78db86897d48..0849cffd0b5cc5170667cd85b050ccaa8051619e 100644 (file)
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.file.Path;
 import org.eclipse.jdt.annotation.NonNull;
 
 /**
@@ -31,12 +30,12 @@ final class MappedFileWriter extends FileWriter {
     private final MappedFileReader reader;
     private final ByteBuffer buffer;
 
-    MappedFileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
-        super(path, channel, maxSegmentSize, maxEntrySize);
+    MappedFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+        super(file, channel, maxEntrySize);
 
-        mappedBuffer = mapBuffer(channel, maxSegmentSize);
+        mappedBuffer = mapBuffer(channel, file.maxSize());
         buffer = mappedBuffer.slice();
-        reader = new MappedFileReader(path, mappedBuffer);
+        reader = new MappedFileReader(file, mappedBuffer);
     }
 
     private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
@@ -65,7 +64,7 @@ final class MappedFileWriter extends FileWriter {
     @Override
     DiskFileWriter toDisk() {
         close();
-        return new DiskFileWriter(path, channel, maxSegmentSize, maxEntrySize);
+        return new DiskFileWriter(file, channel, maxEntrySize);
     }
 
     @Override
index 507b0776e95b446fc8c67874704ccb2a43583a2b..c2ca89b40ac381b18f24720d4b62761aeaf4da52 100644 (file)
@@ -236,8 +236,8 @@ public final class SegmentedJournal<E> implements Journal<E> {
    */
   private synchronized void open() {
     // Load existing log segments from disk.
-    for (JournalSegment segment : loadSegments()) {
-      segments.put(segment.descriptor().index(), segment);
+    for (var segment : loadSegments()) {
+      segments.put(segment.firstIndex(), segment);
     }
 
     // If a segment doesn't already exist, create an initial segment starting at index 1.
@@ -340,7 +340,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
 
     final var index = currentSegment.lastIndex() + 1;
     final var lastSegment = getLastSegment();
-    currentSegment = createSegment(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1, index);
+    currentSegment = createSegment(lastSegment != null ? lastSegment.file().descriptor().id() + 1 : 1, index);
     segments.put(index, currentSegment);
     return currentSegment;
   }
@@ -409,7 +409,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
       throw new StorageException(e);
     }
 
-    final var segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
+    final var segment = newSegment(new JournalSegmentFile(segmentFile.toPath(), descriptor));
     LOG.debug("Created segment: {}", segment);
     return segment;
   }
@@ -418,11 +418,10 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * Creates a new segment instance.
    *
    * @param segmentFile The segment file.
-   * @param descriptor The segment descriptor.
    * @return The segment instance.
    */
-  protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
-    return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
+  protected JournalSegment newSegment(JournalSegmentFile segmentFile) {
+    return new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
   }
 
   /**
@@ -441,18 +440,18 @@ public final class SegmentedJournal<E> implements Journal<E> {
 
       // If the file looks like a segment file, attempt to load the segment.
       if (JournalSegmentFile.isSegmentFile(name, file)) {
+        final var path = file.toPath();
         // read the descriptor
         final JournalSegmentDescriptor descriptor;
         try {
-          descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
+          descriptor = JournalSegmentDescriptor.readFrom(path);
         } catch (IOException e) {
           throw new StorageException(e);
         }
 
         // Load the segment.
-        final var segmentFile = new JournalSegmentFile(file);
-        final var segment = newSegment(segmentFile, descriptor);
-        LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
+        final var segment = newSegment(new JournalSegmentFile(path, descriptor));
+        LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), path);
 
         // Add the segment to the segments list.
         segments.put(segment.firstIndex(), segment);
@@ -466,8 +465,8 @@ public final class SegmentedJournal<E> implements Journal<E> {
     while (iterator.hasNext()) {
       final var segment = iterator.next().getValue();
       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
-        LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
-            previousSegment.file().file());
+        LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
+            previousSegment.file().path());
         corrupted = true;
       }
       if (corrupted) {