From 323f56b4c4da7a19313dff1e35a60cf5e18f5942 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sat, 23 Mar 2024 22:52:25 +0100 Subject: [PATCH] Factor out FileReader interface As it turns out, JournalSegmentReader's specializations are providing a very simple API they provide via abstract methods. Extract this API into FileReader, allowing JournalSegmentReader to become a final class, reducing mental requirements to understand what is going on. JIRA: CONTROLLER-2109 Change-Id: I4199f10a9483b18ac381f16e2b0d818f2d74ac16 Signed-off-by: Robert Varga --- ...SegmentReader.java => DiskFileReader.java} | 36 +++++++------ .../io/atomix/storage/journal/FileReader.java | 54 +++++++++++++++++++ .../storage/journal/JournalSegment.java | 7 +-- .../storage/journal/JournalSegmentReader.java | 34 +++++------- ...gmentReader.java => MappedFileReader.java} | 15 +++--- 5 files changed, 98 insertions(+), 48 deletions(-) rename atomix-storage/src/main/java/io/atomix/storage/journal/{DiskJournalSegmentReader.java => DiskFileReader.java} (72%) create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java rename atomix-storage/src/main/java/io/atomix/storage/journal/{MappedJournalSegmentReader.java => MappedFileReader.java} (65%) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java similarity index 72% rename from atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java rename to atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java index 766a7e7d65..15c3d9b5b0 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java @@ -1,6 +1,5 @@ /* - * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. - * Copyright (c) 2024 PANTHEON.tech, s.r.o. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,40 +21,45 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Path; +import org.checkerframework.checker.nullness.qual.NonNull; /** - * Log segment reader. - * - * @author Jordan Halterman + * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer. */ -final class DiskJournalSegmentReader extends JournalSegmentReader { +final class DiskFileReader extends FileReader { private final FileChannel channel; private final ByteBuffer buffer; // tracks where memory's first available byte maps to in terms of FileChannel.position() private int bufferPosition; - DiskJournalSegmentReader(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, - final JournalSerdes namespace) { - super(segment, maxEntrySize, namespace); + DiskFileReader(final Path path, final FileChannel channel, final int maxEntrySize) { + super(path); this.channel = requireNonNull(channel); - buffer = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2).flip(); + buffer = ByteBuffer.allocate(chooseBufferSize(maxEntrySize)).flip(); bufferPosition = 0; } - @Override void invalidateCache() { + private static int chooseBufferSize(final int maxEntrySize) { + return (maxEntrySize + SegmentEntry.HEADER_BYTES) * 2; + } + + @Override + void invalidateCache() { buffer.clear().flip(); bufferPosition = 0; } - @Override ByteBuffer read(final int position, final int size) { + @Override + ByteBuffer read(final int position, final int size) { // calculate logical seek distance between buffer's first byte and position and split flow between // forward-moving and backwards-moving code paths. final int seek = bufferPosition - position; return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size); } - private ByteBuffer forwardAndRead(final int seek, final int position, final int size) { + private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) { final int missing = buffer.limit() - seek - size; if (missing <= 0) { // fast path: we have the requested region @@ -72,7 +76,7 @@ final class DiskJournalSegmentReader extends JournalSegmentReader { return setAndSlice(position, size); } - ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) { + private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) { // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and // do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and // read it. @@ -81,7 +85,7 @@ final class DiskJournalSegmentReader extends JournalSegmentReader { return setAndSlice(position, size); } - void readAtLeast(final int readPosition, final int readAtLeast) { + private void readAtLeast(final int readPosition, final int readAtLeast) { final int bytesRead; try { bytesRead = channel.read(buffer, readPosition); @@ -92,7 +96,7 @@ final class DiskJournalSegmentReader extends JournalSegmentReader { buffer.flip(); } - private ByteBuffer setAndSlice(final int position, final int size) { + private @NonNull ByteBuffer setAndSlice(final int position, final int size) { bufferPosition = position; return buffer.slice(0, size).asReadOnlyBuffer(); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java new file mode 100644 index 0000000000..fdc0597d36 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import org.eclipse.jdt.annotation.NonNull; + +/** + * An abstraction over how to read a {@link JournalSegmentFile}. + */ +abstract sealed class FileReader permits DiskFileReader, MappedFileReader { + private final Path path; + + FileReader(final Path path) { + this.path = requireNonNull(path); + } + + /** + * Invalidate any cache that is present, so that the next read is coherent with the backing file. + */ + abstract void invalidateCache(); + + /** + * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed the maximum + * segment size nor maximum entry size. + * + * @param position position to the entry header + * @param size to read + * @return resulting buffer + */ + abstract @NonNull ByteBuffer read(int position, int size); + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("path", path).toString(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index d21d9051b6..81699a094a 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -185,9 +185,10 @@ final class JournalSegment implements AutoCloseable { acquire(); final var buffer = writer.buffer(); - final var reader = buffer == null - ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace) - : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace); + final var path = file.file().toPath(); + final var fileReader = buffer != null ? new MappedFileReader(path, buffer) + : new DiskFileReader(path, channel, maxEntrySize); + final var reader = new JournalSegmentReader<>(this, fileReader, maxEntrySize, namespace); reader.setPosition(JournalSegmentDescriptor.BYTES); readers.add(reader); return reader; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index af07a1e953..93ccd1748e 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -19,24 +19,26 @@ import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; import com.esotericsoftware.kryo.KryoException; -import java.nio.ByteBuffer; import java.util.zip.CRC32; import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract sealed class JournalSegmentReader permits DiskJournalSegmentReader, MappedJournalSegmentReader { +final class JournalSegmentReader { private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class); private final JournalSegment segment; private final JournalSerdes namespace; + private final FileReader fileReader; private final int maxSegmentSize; private final int maxEntrySize; private int position; - JournalSegmentReader(final JournalSegment segment, final int maxEntrySize, final JournalSerdes namespace) { + JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, + final int maxEntrySize, final JournalSerdes namespace) { this.segment = requireNonNull(segment); + this.fileReader = requireNonNull(fileReader); maxSegmentSize = segment.descriptor().maxSegmentSize(); this.maxEntrySize = maxEntrySize; this.namespace = requireNonNull(namespace); @@ -47,7 +49,7 @@ abstract sealed class JournalSegmentReader permits DiskJournalSegmentReader, * * @return current position. */ - final int position() { + int position() { return position; } @@ -56,17 +58,19 @@ abstract sealed class JournalSegmentReader permits DiskJournalSegmentReader, * * @param position new position */ - final void setPosition(final int position) { + void setPosition(final int position) { verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize, "Invalid position %s", position); this.position = position; - invalidateCache(); + fileReader.invalidateCache(); } /** * Invalidate any cache that is present, so that the next read is coherent with the backing file. */ - abstract void invalidateCache(); + void invalidateCache() { + fileReader.invalidateCache(); + } /** * Reads the next entry, assigning it specified index. @@ -74,7 +78,7 @@ abstract sealed class JournalSegmentReader permits DiskJournalSegmentReader, * @param index entry index * @return The entry, or {@code null} */ - final @Nullable Indexed readEntry(final long index) { + @Nullable Indexed readEntry(final long index) { // Check if there is enough in the buffer remaining final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES; if (remaining < 0) { @@ -84,7 +88,7 @@ abstract sealed class JournalSegmentReader permits DiskJournalSegmentReader, // Calculate maximum entry length not exceeding file size nor maxEntrySize final var maxLength = Math.min(remaining, maxEntrySize); - final var buffer = read(position, maxLength + SegmentEntry.HEADER_BYTES); + final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES); // Read the entry length final var length = buffer.getInt(0); @@ -127,20 +131,10 @@ abstract sealed class JournalSegmentReader permits DiskJournalSegmentReader, return new Indexed<>(index, entry, length); } - /** - * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed - * {@link #maxSegmentSize}. - * - * @param position position to the entry header - * @param size to read, guaranteed to not exceed {@link #maxEntrySize} - * @return resulting buffer - */ - abstract ByteBuffer read(int position, int size); - /** * Close this reader. */ - final void close() { + void close() { segment.closeReader(this); } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java similarity index 65% rename from atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java rename to atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java index e905972358..204fd72550 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java @@ -1,6 +1,5 @@ /* - * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. - * Copyright (c) 2024 PANTHEON.tech, s.r.o. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,18 +16,16 @@ package io.atomix.storage.journal; import java.nio.ByteBuffer; +import java.nio.file.Path; /** - * Log segment reader. - * - * @author Jordan Halterman + * A {@link StorageLevel#MAPPED} implementation of {@link FileReader}. Operates on direct mapping of the entire file. */ -final class MappedJournalSegmentReader extends JournalSegmentReader { +final class MappedFileReader extends FileReader { private final ByteBuffer buffer; - MappedJournalSegmentReader(final ByteBuffer buffer, final JournalSegment segment, final int maxEntrySize, - final JournalSerdes namespace) { - super(segment, maxEntrySize, namespace); + MappedFileReader(final Path path, final ByteBuffer buffer) { + super(path); this.buffer = buffer.slice().asReadOnlyBuffer(); } -- 2.36.6