From: Robert Varga Date: Sun, 5 May 2024 21:24:58 +0000 (+0200) Subject: Retain RandomAccessFile in JournalSegmentFile X-Git-Tag: v9.0.3~42 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=ece1376a462788f613f27ac5b099f364896de773;p=controller.git Retain RandomAccessFile in JournalSegmentFile Having a RandomAccessFile is nice, as it internally holds a FileChannel. This makes our files stateful -- which is okay, as we still manage their lifecycle via JournalSegment. JIRA: CONTROLLER-2099 Change-Id: Id8305c74dbd881eaf52d84191c11bb4ea2bc164b Signed-off-by: Robert Varga --- diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java index 697604e600..a934a90308 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java @@ -16,7 +16,6 @@ package io.atomix.storage.journal; import static com.google.common.base.Verify.verify; -import static java.util.Objects.requireNonNull; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,14 +37,14 @@ final class DiskFileReader extends FileReader { // tracks where memory's first available byte maps to in terms of FileChannel.position() private int bufferPosition; - DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { - this(file, channel, allocateBuffer(file.maxSize(), maxEntrySize)); + DiskFileReader(final JournalSegmentFile file, final int maxEntrySize) { + this(file, allocateBuffer(file.maxSize(), maxEntrySize)); } // Note: take ownership of the buffer - DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final ByteBuffer buffer) { + DiskFileReader(final JournalSegmentFile file, final ByteBuffer buffer) { super(file); - this.channel = requireNonNull(channel); + channel = file.channel(); this.buffer = buffer.flip(); bufferPosition = 0; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java index ffa11e819b..74fb2d8be8 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java @@ -29,12 +29,14 @@ final class DiskFileWriter extends FileWriter { private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]); private final DiskFileReader reader; + private final FileChannel channel; private final ByteBuffer buffer; - DiskFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { - super(file, channel, maxEntrySize); + DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize) { + super(file, maxEntrySize); + channel = file.channel(); buffer = DiskFileReader.allocateBuffer(file.maxSize(), maxEntrySize); - reader = new DiskFileReader(file, channel, buffer); + reader = new DiskFileReader(file, buffer); } @Override @@ -50,7 +52,7 @@ final class DiskFileWriter extends FileWriter { @Override MappedFileWriter toMapped() { flush(); - return new MappedFileWriter(file, channel, maxEntrySize); + return new MappedFileWriter(file, maxEntrySize); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java index 0a9bb3ef11..e9f06a15fc 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java @@ -25,7 +25,7 @@ import org.eclipse.jdt.annotation.NonNull; * An abstraction over how to read a {@link JournalSegmentFile}. */ abstract sealed class FileReader permits DiskFileReader, MappedFileReader { - private final JournalSegmentFile file; + private final @NonNull JournalSegmentFile file; FileReader(final JournalSegmentFile file) { this.file = requireNonNull(file); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java index 3e566fe90b..a677fc3d26 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java @@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; import org.eclipse.jdt.annotation.Nullable; /** @@ -28,12 +27,10 @@ import org.eclipse.jdt.annotation.Nullable; */ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter { final JournalSegmentFile file; - final FileChannel channel; final int maxEntrySize; - FileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { + FileWriter(final JournalSegmentFile file, final int maxEntrySize) { this.file = requireNonNull(file); - this.channel = requireNonNull(channel); this.maxEntrySize = maxEntrySize; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index a7f4c5aadc..b73d942c03 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -23,9 +23,7 @@ import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.Position; import io.atomix.storage.journal.index.SparseJournalIndex; import java.io.IOException; -import java.nio.channels.FileChannel; import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -47,7 +45,6 @@ final class JournalSegment { private final JournalIndex journalIndex; private final Set readers = ConcurrentHashMap.newKeySet(); private final AtomicInteger references = new AtomicInteger(); - private final FileChannel channel; private JournalSegmentWriter writer; private boolean open = true; @@ -61,16 +58,10 @@ final class JournalSegment { this.storageLevel = requireNonNull(storageLevel); this.maxEntrySize = maxEntrySize; journalIndex = new SparseJournalIndex(indexDensity); - try { - channel = FileChannel.open(file.path(), - StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); - } catch (IOException e) { - throw new StorageException(e); - } final var fileWriter = switch (storageLevel) { - case DISK -> new DiskFileWriter(file, channel, maxEntrySize); - case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize); + case DISK -> new DiskFileWriter(file, maxEntrySize); + case MAPPED -> new MappedFileWriter(file, maxEntrySize); }; writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex) // relinquish mapped memory @@ -95,19 +86,6 @@ final class JournalSegment { return writer.getLastIndex(); } - /** - * Returns the size of the segment. - * - * @return the size of the segment - */ - int size() { - try { - return (int) channel.size(); - } catch (IOException e) { - throw new StorageException(e); - } - } - /** * Returns the segment file. * @@ -179,8 +157,7 @@ final class JournalSegment { acquire(); final var buffer = writer.buffer(); - final var fileReader = buffer != null ? new MappedFileReader(file, buffer) - : new DiskFileReader(file, channel, maxEntrySize); + final var fileReader = buffer != null ? new MappedFileReader(file, buffer) : new DiskFileReader(file, maxEntrySize); final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize); reader.setPosition(JournalSegmentDescriptor.BYTES); readers.add(reader); @@ -235,7 +212,7 @@ final class JournalSegment { private void finishClose() { writer.close(); try { - channel.close(); + file.close(); } catch (IOException e) { throw new StorageException(e); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java index 24652f003b..97dbab72ef 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java @@ -18,9 +18,7 @@ package io.atomix.storage.journal; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; +import java.nio.channels.ReadableByteChannel; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; @@ -69,23 +67,20 @@ public record JournalSegmentDescriptor( static final int VERSION = 1; /** - * Read a JournalSegmentDescriptor from a {@link Path}. + * Read a JournalSegmentDescriptor from a {@link ReadableByteChannel}. * - * @param path path to read from + * @param channel channel to read from * @return A {@link JournalSegmentDescriptor} * @throws IOException if an I/O error occurs or there is not enough data */ - public static @NonNull JournalSegmentDescriptor readFrom(final Path path) throws IOException { - final byte[] bytes; - try (var is = Files.newInputStream(path, StandardOpenOption.READ)) { - bytes = is.readNBytes(BYTES); + public static @NonNull JournalSegmentDescriptor readFrom(final ReadableByteChannel channel) throws IOException { + final var buffer = ByteBuffer.allocate(BYTES); + final var read = channel.read(buffer); + if (read != BYTES) { + throw new IOException("Need " + BYTES + " bytes, only " + read + " available"); } - if (bytes.length != BYTES) { - throw new IOException("Need " + BYTES + " bytes, only " + bytes.length + " available"); - } - - final var buffer = ByteBuffer.wrap(bytes); + buffer.flip(); return new JournalSegmentDescriptor( buffer.getInt(), buffer.getLong(), diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java index a7ab481dbb..825baa27a2 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java @@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.nio.file.Path; import org.eclipse.jdt.annotation.NonNull; @@ -37,25 +38,40 @@ final class JournalSegmentFile { private final @NonNull JournalSegmentDescriptor descriptor; private final @NonNull Path path; - private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor) { + private final RandomAccessFile file; + + private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor, + final RandomAccessFile file) { this.path = requireNonNull(path); this.descriptor = requireNonNull(descriptor); + this.file = requireNonNull(file); } static @NonNull JournalSegmentFile createNew(final String name, final File directory, final JournalSegmentDescriptor descriptor) throws IOException { final var file = createSegmentFile(name, directory, descriptor.id()); - try (var raf = new RandomAccessFile(file, "rw")) { + final var raf = new RandomAccessFile(file, "rw"); + try { raf.setLength(descriptor.maxSegmentSize()); raf.write(descriptor.toArray()); + } catch (IOException e) { + raf.close(); + throw e; } - return new JournalSegmentFile(file.toPath(), descriptor); + return new JournalSegmentFile(file.toPath(), descriptor, raf); } static @NonNull JournalSegmentFile openExisting(final Path path) throws IOException { - // read the descriptor - final var descriptor = JournalSegmentDescriptor.readFrom(path); - return new JournalSegmentFile(path, descriptor); + final var raf = new RandomAccessFile(path.toFile(), "rw"); + final JournalSegmentDescriptor descriptor; + try { + // read the descriptor + descriptor = JournalSegmentDescriptor.readFrom(raf.getChannel()); + } catch (IOException e) { + raf.close(); + throw e; + } + return new JournalSegmentFile(path, descriptor, raf); } /** @@ -80,6 +96,18 @@ final class JournalSegmentFile { return descriptor.maxSegmentSize(); } + int size() throws IOException { + return (int) file.length(); + } + + FileChannel channel() { + return file.getChannel(); + } + + void close() throws IOException { + file.close(); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java index 0849cffd0b..f91cdc827a 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java @@ -30,10 +30,10 @@ final class MappedFileWriter extends FileWriter { private final MappedFileReader reader; private final ByteBuffer buffer; - MappedFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { - super(file, channel, maxEntrySize); + MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize) { + super(file, maxEntrySize); - mappedBuffer = mapBuffer(channel, file.maxSize()); + mappedBuffer = mapBuffer(file.channel(), file.maxSize()); buffer = mappedBuffer.slice(); reader = new MappedFileReader(file, mappedBuffer); } @@ -64,7 +64,7 @@ final class MappedFileWriter extends FileWriter { @Override DiskFileWriter toDisk() { close(); - return new DiskFileWriter(file, channel, maxEntrySize); + return new DiskFileWriter(file, maxEntrySize); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 7e821277ce..23a5419b83 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -191,7 +191,13 @@ public final class SegmentedJournal implements Journal { */ public long size() { return segments.values().stream() - .mapToLong(JournalSegment::size) + .mapToLong(segment -> { + try { + return segment.file().size(); + } catch (IOException e) { + throw new StorageException(e); + } + }) .sum(); }