Separate out {From,To}ByteBufMapper
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskFileReader.java
index 15c3d9b5b06ba5c797cc75fd588f20fea97aa341..961ba3ba43e3459a6f3f529607d142392444a1fd 100644 (file)
@@ -18,52 +18,58 @@ package io.atomix.storage.journal;
 import static com.google.common.base.Verify.verify;
 import static java.util.Objects.requireNonNull;
 
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import org.checkerframework.checker.nullness.qual.NonNull;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
  */
 final class DiskFileReader extends FileReader {
     private final FileChannel channel;
-    private final ByteBuffer buffer;
 
+    private ByteBuf buffer;
     // tracks where memory's first available byte maps to in terms of FileChannel.position()
     private int bufferPosition;
 
-    DiskFileReader(final Path path, final FileChannel channel, final int maxEntrySize) {
-        super(path);
-        this.channel = requireNonNull(channel);
-        buffer = ByteBuffer.allocate(chooseBufferSize(maxEntrySize)).flip();
+    // Note: take ownership of the buffer
+    DiskFileReader(final JournalSegmentFile file, final ByteBuf buffer) {
+        super(file);
+        this.buffer = requireNonNull(buffer);
+        channel = file.channel();
         bufferPosition = 0;
     }
 
-    private static int chooseBufferSize(final int maxEntrySize) {
-        return (maxEntrySize + SegmentEntry.HEADER_BYTES) * 2;
-    }
-
     @Override
     void invalidateCache() {
-        buffer.clear().flip();
+        buffer.clear();
         bufferPosition = 0;
     }
 
     @Override
-    ByteBuffer read(final int position, final int size) {
+    ByteBuf read(final int position, final int size) {
         // calculate logical seek distance between buffer's first byte and position and split flow between
         // forward-moving and backwards-moving code paths.
         final int seek = bufferPosition - position;
         return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
     }
 
-    private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
-        final int missing = buffer.limit() - seek - size;
+    @Override
+    void release() {
+        final var local = buffer;
+        if (local != null) {
+            buffer = null;
+            local.release();
+        }
+    }
+
+    private @NonNull ByteBuf forwardAndRead(final int seek, final int position, final int size) {
+        final int remaining = buffer.writerIndex() - seek;
+        final int missing = remaining - size;
         if (missing <= 0) {
             // fast path: we have the requested region
-            return buffer.slice(seek, size).asReadOnlyBuffer();
+            return buffer.slice(seek, size).asReadOnly();
         }
 
         // We need to read more data, but let's salvage what we can:
@@ -71,12 +77,12 @@ final class DiskFileReader extends FileReader {
         // - run compact, which moves everything between position and limit onto the beginning of buffer and
         //   sets it up to receive more bytes
         // - start the read accounting for the seek
-        buffer.position(seek).compact();
+        buffer.writeBytes(buffer, seek, remaining);
         readAtLeast(position + seek, missing);
         return setAndSlice(position, size);
     }
 
-    private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
+    private @NonNull ByteBuf rewindAndRead(final int rewindBy, final int position, final int size) {
         // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
         //       do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
         //       read it.
@@ -88,16 +94,15 @@ final class DiskFileReader extends FileReader {
     private void readAtLeast(final int readPosition, final int readAtLeast) {
         final int bytesRead;
         try {
-            bytesRead = channel.read(buffer, readPosition);
+            bytesRead = buffer.writeBytes(channel, readPosition, readAtLeast);
         } catch (IOException e) {
             throw new StorageException(e);
         }
         verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast);
-        buffer.flip();
     }
 
-    private @NonNull ByteBuffer setAndSlice(final int position, final int size) {
+    private @NonNull ByteBuf setAndSlice(final int position, final int size) {
         bufferPosition = position;
-        return buffer.slice(0, size).asReadOnlyBuffer();
+        return buffer.slice(0, size).asReadOnly();
     }
 }