From 15af95195e18a4b81caf5d7762e8b8642beec022 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 5 May 2024 22:04:20 +0200 Subject: [PATCH] Expand JournalSegmentFile semantics JournalSegmentFile also includes a descriptor, making it more useful than just a File/Path holder. JIRA: CONTROLLER-2099 Change-Id: Ia808512b7c3012bdb8e749429ac4b785643fa6c7 Signed-off-by: Robert Varga --- .../storage/journal/DiskFileReader.java | 9 ++--- .../storage/journal/DiskFileWriter.java | 11 +++--- .../io/atomix/storage/journal/FileReader.java | 9 ++--- .../io/atomix/storage/journal/FileWriter.java | 11 ++---- .../storage/journal/JournalSegment.java | 36 +++++++----------- .../storage/journal/JournalSegmentFile.java | 38 ++++++++++++++----- .../storage/journal/JournalSegmentReader.java | 2 +- .../storage/journal/JournalSegmentWriter.java | 2 +- .../storage/journal/MappedFileReader.java | 5 +-- .../storage/journal/MappedFileWriter.java | 11 +++--- .../storage/journal/SegmentedJournal.java | 25 ++++++------ 11 files changed, 80 insertions(+), 79 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java index 311d16b150..697604e600 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java @@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Path; import org.eclipse.jdt.annotation.NonNull; /** @@ -39,13 +38,13 @@ final class DiskFileReader extends FileReader { // tracks where memory's first available byte maps to in terms of FileChannel.position() private int bufferPosition; - DiskFileReader(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) { - this(path, channel, allocateBuffer(maxSegmentSize, maxEntrySize)); + DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { + this(file, channel, allocateBuffer(file.maxSize(), maxEntrySize)); } // Note: take ownership of the buffer - DiskFileReader(final Path path, final FileChannel channel, final ByteBuffer buffer) { - super(path); + DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final ByteBuffer buffer) { + super(file); this.channel = requireNonNull(channel); this.buffer = buffer.flip(); bufferPosition = 0; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java index 5f468d46a1..ffa11e819b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Path; /** * A {@link StorageLevel#DISK} {@link FileWriter}. @@ -32,10 +31,10 @@ final class DiskFileWriter extends FileWriter { private final DiskFileReader reader; private final ByteBuffer buffer; - DiskFileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) { - super(path, channel, maxSegmentSize, maxEntrySize); - buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); - reader = new DiskFileReader(path, channel, buffer); + DiskFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { + super(file, channel, maxEntrySize); + buffer = DiskFileReader.allocateBuffer(file.maxSize(), maxEntrySize); + reader = new DiskFileReader(file, channel, buffer); } @Override @@ -51,7 +50,7 @@ final class DiskFileWriter extends FileWriter { @Override MappedFileWriter toMapped() { flush(); - return new MappedFileWriter(path, channel, maxSegmentSize, maxEntrySize); + return new MappedFileWriter(file, channel, maxEntrySize); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java index fdc0597d36..0a9bb3ef11 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java @@ -19,17 +19,16 @@ import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import java.nio.ByteBuffer; -import java.nio.file.Path; import org.eclipse.jdt.annotation.NonNull; /** * An abstraction over how to read a {@link JournalSegmentFile}. */ abstract sealed class FileReader permits DiskFileReader, MappedFileReader { - private final Path path; + private final JournalSegmentFile file; - FileReader(final Path path) { - this.path = requireNonNull(path); + FileReader(final JournalSegmentFile file) { + this.file = requireNonNull(file); } /** @@ -49,6 +48,6 @@ abstract sealed class FileReader permits DiskFileReader, MappedFileReader { @Override public final String toString() { - return MoreObjects.toStringHelper(this).add("path", path).toString(); + return MoreObjects.toStringHelper(this).add("path", file.path()).toString(); } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java index 4ead89bfb3..3e566fe90b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java @@ -21,22 +21,19 @@ import com.google.common.base.MoreObjects; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Path; import org.eclipse.jdt.annotation.Nullable; /** * An abstraction over how to write a {@link JournalSegmentFile}. */ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter { - final Path path; + final JournalSegmentFile file; final FileChannel channel; - final int maxSegmentSize; final int maxEntrySize; - FileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) { - this.path = requireNonNull(path); + FileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { + this.file = requireNonNull(file); this.channel = requireNonNull(channel); - this.maxSegmentSize = maxSegmentSize; this.maxEntrySize = maxEntrySize; } @@ -70,7 +67,7 @@ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter { @Override public final String toString() { - return MoreObjects.toStringHelper(this).add("path", path).toString(); + return MoreObjects.toStringHelper(this).add("path", file.path()).toString(); } abstract @Nullable MappedByteBuffer buffer(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index 02921bed2b..e8954bcc7f 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -16,6 +16,8 @@ */ package io.atomix.storage.journal; +import static java.util.Objects.requireNonNull; + import com.google.common.base.MoreObjects; import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.Position; @@ -36,7 +38,6 @@ import org.eclipse.jdt.annotation.Nullable; */ final class JournalSegment implements AutoCloseable { private final JournalSegmentFile file; - private final JournalSegmentDescriptor descriptor; private final StorageLevel storageLevel; private final int maxEntrySize; private final JournalIndex journalIndex; @@ -49,25 +50,23 @@ final class JournalSegment implements AutoCloseable { JournalSegment( final JournalSegmentFile file, - final JournalSegmentDescriptor descriptor, final StorageLevel storageLevel, final int maxEntrySize, final double indexDensity) { - this.file = file; - this.descriptor = descriptor; - this.storageLevel = storageLevel; + this.file = requireNonNull(file); + this.storageLevel = requireNonNull(storageLevel); this.maxEntrySize = maxEntrySize; journalIndex = new SparseJournalIndex(indexDensity); try { - channel = FileChannel.open(file.file().toPath(), + channel = FileChannel.open(file.path(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); } catch (IOException e) { throw new StorageException(e); } final var fileWriter = switch (storageLevel) { - case DISK -> new DiskFileWriter(file.file().toPath(), channel, descriptor.maxSegmentSize(), maxEntrySize); - case MAPPED -> new MappedFileWriter(file.file().toPath(), channel, descriptor.maxSegmentSize(), maxEntrySize); + case DISK -> new DiskFileWriter(file, channel, maxEntrySize); + case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize); }; writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex) // relinquish mapped memory @@ -80,7 +79,7 @@ final class JournalSegment implements AutoCloseable { * @return The segment's starting index. */ long firstIndex() { - return descriptor.index(); + return file.descriptor().index(); } /** @@ -114,15 +113,6 @@ final class JournalSegment implements AutoCloseable { return file; } - /** - * Returns the segment descriptor. - * - * @return The segment descriptor. - */ - JournalSegmentDescriptor descriptor() { - return descriptor; - } - /** * Looks up the position of the given index. * @@ -185,9 +175,8 @@ final class JournalSegment implements AutoCloseable { acquire(); final var buffer = writer.buffer(); - final var path = file.file().toPath(); - final var fileReader = buffer != null ? new MappedFileReader(path, buffer) - : new DiskFileReader(path, channel, descriptor.maxSegmentSize(), maxEntrySize); + final var fileReader = buffer != null ? new MappedFileReader(file, buffer) + : new DiskFileReader(file, channel, maxEntrySize); final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize); reader.setPosition(JournalSegmentDescriptor.BYTES); readers.add(reader); @@ -253,7 +242,7 @@ final class JournalSegment implements AutoCloseable { */ void delete() { try { - Files.deleteIfExists(file.file().toPath()); + Files.deleteIfExists(file.path()); } catch (IOException e) { throw new StorageException(e); } @@ -261,10 +250,11 @@ final class JournalSegment implements AutoCloseable { @Override public String toString() { + final var descriptor = file.descriptor(); return MoreObjects.toStringHelper(this) .add("id", descriptor.id()) .add("version", descriptor.version()) - .add("index", firstIndex()) + .add("index", descriptor.index()) .toString(); } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java index 1306c4b79f..adc9910087 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java @@ -17,25 +17,27 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; +import com.google.common.base.MoreObjects; import java.io.File; +import java.nio.file.Path; +import org.eclipse.jdt.annotation.NonNull; /** * Segment file utility. * * @author Jordan Halterman */ -public final class JournalSegmentFile { +final class JournalSegmentFile { private static final char PART_SEPARATOR = '-'; private static final char EXTENSION_SEPARATOR = '.'; private static final String EXTENSION = "log"; - private final File file; + private final @NonNull JournalSegmentDescriptor descriptor; + private final @NonNull Path path; - /** - * @throws IllegalArgumentException if {@code file} is not a valid segment file - */ - JournalSegmentFile(final File file) { - this.file = file; + JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor) { + this.path = requireNonNull(path); + this.descriptor = requireNonNull(descriptor); } /** @@ -43,8 +45,26 @@ public final class JournalSegmentFile { * * @return The segment file. */ - public File file() { - return file; + @NonNull Path path() { + return path; + } + + /** + * Returns the segment descriptor. + * + * @return The segment descriptor. + */ + @NonNull JournalSegmentDescriptor descriptor() { + return descriptor; + } + + int maxSize() { + return descriptor.maxSegmentSize(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString(); } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index d89c720c67..5258f4323c 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -38,7 +38,7 @@ final class JournalSegmentReader { JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) { this.segment = requireNonNull(segment); this.fileReader = requireNonNull(fileReader); - maxSegmentSize = segment.descriptor().maxSegmentSize(); + maxSegmentSize = segment.file().maxSize(); this.maxEntrySize = maxEntrySize; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index e381bc25a7..317e8fd45b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -44,7 +44,7 @@ final class JournalSegmentWriter { this.fileWriter = requireNonNull(fileWriter); this.segment = requireNonNull(segment); this.index = requireNonNull(index); - maxSegmentSize = segment.descriptor().maxSegmentSize(); + maxSegmentSize = segment.file().maxSize(); this.maxEntrySize = maxEntrySize; // adjust lastEntry value reset(0); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java index 204fd72550..9d129f89a1 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java @@ -16,7 +16,6 @@ package io.atomix.storage.journal; import java.nio.ByteBuffer; -import java.nio.file.Path; /** * A {@link StorageLevel#MAPPED} implementation of {@link FileReader}. Operates on direct mapping of the entire file. @@ -24,8 +23,8 @@ import java.nio.file.Path; final class MappedFileReader extends FileReader { private final ByteBuffer buffer; - MappedFileReader(final Path path, final ByteBuffer buffer) { - super(path); + MappedFileReader(final JournalSegmentFile file, final ByteBuffer buffer) { + super(file); this.buffer = buffer.slice().asReadOnlyBuffer(); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java index 47f26ba151..0849cffd0b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Path; import org.eclipse.jdt.annotation.NonNull; /** @@ -31,12 +30,12 @@ final class MappedFileWriter extends FileWriter { private final MappedFileReader reader; private final ByteBuffer buffer; - MappedFileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) { - super(path, channel, maxSegmentSize, maxEntrySize); + MappedFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { + super(file, channel, maxEntrySize); - mappedBuffer = mapBuffer(channel, maxSegmentSize); + mappedBuffer = mapBuffer(channel, file.maxSize()); buffer = mappedBuffer.slice(); - reader = new MappedFileReader(path, mappedBuffer); + reader = new MappedFileReader(file, mappedBuffer); } private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) { @@ -65,7 +64,7 @@ final class MappedFileWriter extends FileWriter { @Override DiskFileWriter toDisk() { close(); - return new DiskFileWriter(path, channel, maxSegmentSize, maxEntrySize); + return new DiskFileWriter(file, channel, maxEntrySize); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 507b0776e9..c2ca89b40a 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -236,8 +236,8 @@ public final class SegmentedJournal implements Journal { */ private synchronized void open() { // Load existing log segments from disk. - for (JournalSegment segment : loadSegments()) { - segments.put(segment.descriptor().index(), segment); + for (var segment : loadSegments()) { + segments.put(segment.firstIndex(), segment); } // If a segment doesn't already exist, create an initial segment starting at index 1. @@ -340,7 +340,7 @@ public final class SegmentedJournal implements Journal { final var index = currentSegment.lastIndex() + 1; final var lastSegment = getLastSegment(); - currentSegment = createSegment(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1, index); + currentSegment = createSegment(lastSegment != null ? lastSegment.file().descriptor().id() + 1 : 1, index); segments.put(index, currentSegment); return currentSegment; } @@ -409,7 +409,7 @@ public final class SegmentedJournal implements Journal { throw new StorageException(e); } - final var segment = newSegment(new JournalSegmentFile(segmentFile), descriptor); + final var segment = newSegment(new JournalSegmentFile(segmentFile.toPath(), descriptor)); LOG.debug("Created segment: {}", segment); return segment; } @@ -418,11 +418,10 @@ public final class SegmentedJournal implements Journal { * Creates a new segment instance. * * @param segmentFile The segment file. - * @param descriptor The segment descriptor. * @return The segment instance. */ - protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) { - return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity); + protected JournalSegment newSegment(JournalSegmentFile segmentFile) { + return new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity); } /** @@ -441,18 +440,18 @@ public final class SegmentedJournal implements Journal { // If the file looks like a segment file, attempt to load the segment. if (JournalSegmentFile.isSegmentFile(name, file)) { + final var path = file.toPath(); // read the descriptor final JournalSegmentDescriptor descriptor; try { - descriptor = JournalSegmentDescriptor.readFrom(file.toPath()); + descriptor = JournalSegmentDescriptor.readFrom(path); } catch (IOException e) { throw new StorageException(e); } // Load the segment. - final var segmentFile = new JournalSegmentFile(file); - final var segment = newSegment(segmentFile, descriptor); - LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName()); + final var segment = newSegment(new JournalSegmentFile(path, descriptor)); + LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), path); // Add the segment to the segments list. segments.put(segment.firstIndex(), segment); @@ -466,8 +465,8 @@ public final class SegmentedJournal implements Journal { while (iterator.hasNext()) { final var segment = iterator.next().getValue(); if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) { - LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), - previousSegment.file().file()); + LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(), + previousSegment.file().path()); corrupted = true; } if (corrupted) { -- 2.36.6