Retain RandomAccessFile in JournalSegmentFile
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskFileReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Verify.verify;
19
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import org.eclipse.jdt.annotation.NonNull;
24
25 /**
26  * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
27  */
28 final class DiskFileReader extends FileReader {
29     /**
30      * Just do not bother with IO smaller than this many bytes.
31      */
32     private static final int MIN_IO_SIZE = 8192;
33
34     private final FileChannel channel;
35     private final ByteBuffer buffer;
36
37     // tracks where memory's first available byte maps to in terms of FileChannel.position()
38     private int bufferPosition;
39
40     DiskFileReader(final JournalSegmentFile file, final int maxEntrySize) {
41         this(file, allocateBuffer(file.maxSize(), maxEntrySize));
42     }
43
44     // Note: take ownership of the buffer
45     DiskFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
46         super(file);
47         channel = file.channel();
48         this.buffer = buffer.flip();
49         bufferPosition = 0;
50     }
51
52     static ByteBuffer allocateBuffer(final int maxSegmentSize, final int maxEntrySize) {
53         return ByteBuffer.allocate(chooseBufferSize(maxSegmentSize, maxEntrySize));
54     }
55
56     private static int chooseBufferSize(final int maxSegmentSize, final int maxEntrySize) {
57         if (maxSegmentSize <= MIN_IO_SIZE) {
58             // just buffer the entire segment
59             return maxSegmentSize;
60         }
61
62         // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries
63         final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES;
64         return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize;
65     }
66
67     @Override
68     void invalidateCache() {
69         buffer.clear().flip();
70         bufferPosition = 0;
71     }
72
73     @Override
74     ByteBuffer read(final int position, final int size) {
75         // calculate logical seek distance between buffer's first byte and position and split flow between
76         // forward-moving and backwards-moving code paths.
77         final int seek = bufferPosition - position;
78         return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
79     }
80
81     private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
82         final int missing = buffer.limit() - seek - size;
83         if (missing <= 0) {
84             // fast path: we have the requested region
85             return buffer.slice(seek, size).asReadOnlyBuffer();
86         }
87
88         // We need to read more data, but let's salvage what we can:
89         // - set buffer position to seek, which means it points to the same as position
90         // - run compact, which moves everything between position and limit onto the beginning of buffer and
91         //   sets it up to receive more bytes
92         // - start the read accounting for the seek
93         buffer.position(seek).compact();
94         readAtLeast(position + seek, missing);
95         return setAndSlice(position, size);
96     }
97
98     private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
99         // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
100         //       do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
101         //       read it.
102         buffer.clear();
103         readAtLeast(position, size);
104         return setAndSlice(position, size);
105     }
106
107     private void readAtLeast(final int readPosition, final int readAtLeast) {
108         final int bytesRead;
109         try {
110             bytesRead = channel.read(buffer, readPosition);
111         } catch (IOException e) {
112             throw new StorageException(e);
113         }
114         verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast);
115         buffer.flip();
116     }
117
118     private @NonNull ByteBuffer setAndSlice(final int position, final int size) {
119         bufferPosition = position;
120         return buffer.slice(0, size).asReadOnlyBuffer();
121     }
122 }