import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import org.checkerframework.checker.nullness.qual.NonNull;
+import org.eclipse.jdt.annotation.NonNull;
/**
* A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
*/
final class DiskFileReader extends FileReader {
private final FileChannel channel;
- private final ByteBuffer buffer;
+ private ByteBuf buffer;
// tracks where memory's first available byte maps to in terms of FileChannel.position()
private int bufferPosition;
- DiskFileReader(final Path path, final FileChannel channel, final int maxEntrySize) {
- super(path);
- this.channel = requireNonNull(channel);
- buffer = ByteBuffer.allocate(chooseBufferSize(maxEntrySize)).flip();
+ // Note: take ownership of the buffer
+ DiskFileReader(final JournalSegmentFile file, final ByteBuf buffer) {
+ super(file);
+ this.buffer = requireNonNull(buffer);
+ channel = file.channel();
bufferPosition = 0;
}
- private static int chooseBufferSize(final int maxEntrySize) {
- return (maxEntrySize + SegmentEntry.HEADER_BYTES) * 2;
- }
-
@Override
void invalidateCache() {
- buffer.clear().flip();
+ buffer.clear();
bufferPosition = 0;
}
@Override
- ByteBuffer read(final int position, final int size) {
+ ByteBuf read(final int position, final int size) {
// calculate logical seek distance between buffer's first byte and position and split flow between
// forward-moving and backwards-moving code paths.
final int seek = bufferPosition - position;
return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
}
- private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
- final int missing = buffer.limit() - seek - size;
+ @Override
+ void release() {
+ final var local = buffer;
+ if (local != null) {
+ buffer = null;
+ local.release();
+ }
+ }
+
+ private @NonNull ByteBuf forwardAndRead(final int seek, final int position, final int size) {
+ final int remaining = buffer.writerIndex() - seek;
+ final int missing = remaining - size;
if (missing <= 0) {
// fast path: we have the requested region
- return buffer.slice(seek, size).asReadOnlyBuffer();
+ return buffer.slice(seek, size).asReadOnly();
}
// We need to read more data, but let's salvage what we can:
// - run compact, which moves everything between position and limit onto the beginning of buffer and
// sets it up to receive more bytes
// - start the read accounting for the seek
- buffer.position(seek).compact();
+ buffer.writeBytes(buffer, seek, remaining);
readAtLeast(position + seek, missing);
return setAndSlice(position, size);
}
- private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
+ private @NonNull ByteBuf rewindAndRead(final int rewindBy, final int position, final int size) {
// TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
// do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
// read it.
private void readAtLeast(final int readPosition, final int readAtLeast) {
final int bytesRead;
try {
- bytesRead = channel.read(buffer, readPosition);
+ bytesRead = buffer.writeBytes(channel, readPosition, readAtLeast);
} catch (IOException e) {
throw new StorageException(e);
}
verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast);
- buffer.flip();
}
- private @NonNull ByteBuffer setAndSlice(final int position, final int size) {
+ private @NonNull ByteBuf setAndSlice(final int position, final int size) {
bufferPosition = position;
- return buffer.slice(0, size).asReadOnlyBuffer();
+ return buffer.slice(0, size).asReadOnly();
}
}