/*
- * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
- * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import org.checkerframework.checker.nullness.qual.NonNull;
/**
- * Log segment reader.
- *
- * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
+ * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
*/
-final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
+final class DiskFileReader extends FileReader {
private final FileChannel channel;
private final ByteBuffer buffer;
// tracks where memory's first available byte maps to in terms of FileChannel.position()
private int bufferPosition;
- DiskJournalSegmentReader(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalSerdes namespace) {
- super(segment, maxEntrySize, namespace);
+ DiskFileReader(final Path path, final FileChannel channel, final int maxEntrySize) {
+ super(path);
this.channel = requireNonNull(channel);
- buffer = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2).flip();
+ buffer = ByteBuffer.allocate(chooseBufferSize(maxEntrySize)).flip();
bufferPosition = 0;
}
- @Override void invalidateCache() {
+ private static int chooseBufferSize(final int maxEntrySize) {
+ return (maxEntrySize + SegmentEntry.HEADER_BYTES) * 2;
+ }
+
+ @Override
+ void invalidateCache() {
buffer.clear().flip();
bufferPosition = 0;
}
- @Override ByteBuffer read(final int position, final int size) {
+ @Override
+ ByteBuffer read(final int position, final int size) {
// calculate logical seek distance between buffer's first byte and position and split flow between
// forward-moving and backwards-moving code paths.
final int seek = bufferPosition - position;
return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
}
- private ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
+ private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
final int missing = buffer.limit() - seek - size;
if (missing <= 0) {
// fast path: we have the requested region
return setAndSlice(position, size);
}
- ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
+ private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
// TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
// do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
// read it.
return setAndSlice(position, size);
}
- void readAtLeast(final int readPosition, final int readAtLeast) {
+ private void readAtLeast(final int readPosition, final int readAtLeast) {
final int bytesRead;
try {
bytesRead = channel.read(buffer, readPosition);
buffer.flip();
}
- private ByteBuffer setAndSlice(final int position, final int size) {
+ private @NonNull ByteBuffer setAndSlice(final int position, final int size) {
bufferPosition = position;
return buffer.slice(0, size).asReadOnlyBuffer();
}
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * An abstraction over how to read a {@link JournalSegmentFile}.
+ */
+abstract sealed class FileReader permits DiskFileReader, MappedFileReader {
+ private final Path path;
+
+ FileReader(final Path path) {
+ this.path = requireNonNull(path);
+ }
+
+ /**
+ * Invalidate any cache that is present, so that the next read is coherent with the backing file.
+ */
+ abstract void invalidateCache();
+
+ /**
+ * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed the maximum
+ * segment size nor maximum entry size.
+ *
+ * @param position position to the entry header
+ * @param size to read
+ * @return resulting buffer
+ */
+ abstract @NonNull ByteBuffer read(int position, int size);
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("path", path).toString();
+ }
+}
acquire();
final var buffer = writer.buffer();
- final var reader = buffer == null
- ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace)
- : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace);
+ final var path = file.file().toPath();
+ final var fileReader = buffer != null ? new MappedFileReader(path, buffer)
+ : new DiskFileReader(path, channel, maxEntrySize);
+ final var reader = new JournalSegmentReader<>(this, fileReader, maxEntrySize, namespace);
reader.setPosition(JournalSegmentDescriptor.BYTES);
readers.add(reader);
return reader;
import static java.util.Objects.requireNonNull;
import com.esotericsoftware.kryo.KryoException;
-import java.nio.ByteBuffer;
import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader, MappedJournalSegmentReader {
+final class JournalSegmentReader<E> {
private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
private final JournalSegment<E> segment;
private final JournalSerdes namespace;
+ private final FileReader fileReader;
private final int maxSegmentSize;
private final int maxEntrySize;
private int position;
- JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalSerdes namespace) {
+ JournalSegmentReader(final JournalSegment<E> segment, final FileReader fileReader,
+ final int maxEntrySize, final JournalSerdes namespace) {
this.segment = requireNonNull(segment);
+ this.fileReader = requireNonNull(fileReader);
maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
this.namespace = requireNonNull(namespace);
*
* @return current position.
*/
- final int position() {
+ int position() {
return position;
}
*
* @param position new position
*/
- final void setPosition(final int position) {
+ void setPosition(final int position) {
verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
"Invalid position %s", position);
this.position = position;
- invalidateCache();
+ fileReader.invalidateCache();
}
/**
* Invalidate any cache that is present, so that the next read is coherent with the backing file.
*/
- abstract void invalidateCache();
+ void invalidateCache() {
+ fileReader.invalidateCache();
+ }
/**
* Reads the next entry, assigning it specified index.
* @param index entry index
* @return The entry, or {@code null}
*/
- final @Nullable Indexed<E> readEntry(final long index) {
+ @Nullable Indexed<E> readEntry(final long index) {
// Check if there is enough in the buffer remaining
final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
if (remaining < 0) {
// Calculate maximum entry length not exceeding file size nor maxEntrySize
final var maxLength = Math.min(remaining, maxEntrySize);
- final var buffer = read(position, maxLength + SegmentEntry.HEADER_BYTES);
+ final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
// Read the entry length
final var length = buffer.getInt(0);
return new Indexed<>(index, entry, length);
}
- /**
- * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed
- * {@link #maxSegmentSize}.
- *
- * @param position position to the entry header
- * @param size to read, guaranteed to not exceed {@link #maxEntrySize}
- * @return resulting buffer
- */
- abstract ByteBuffer read(int position, int size);
-
/**
* Close this reader.
*/
- final void close() {
+ void close() {
segment.closeReader(this);
}
}
/*
- * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
- * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package io.atomix.storage.journal;
import java.nio.ByteBuffer;
+import java.nio.file.Path;
/**
- * Log segment reader.
- *
- * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
+ * A {@link StorageLevel#MAPPED} implementation of {@link FileReader}. Operates on direct mapping of the entire file.
*/
-final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
+final class MappedFileReader extends FileReader {
private final ByteBuffer buffer;
- MappedJournalSegmentReader(final ByteBuffer buffer, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalSerdes namespace) {
- super(segment, maxEntrySize, namespace);
+ MappedFileReader(final Path path, final ByteBuffer buffer) {
+ super(path);
this.buffer = buffer.slice().asReadOnlyBuffer();
}