Factor out FileReader interface
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskFileReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Verify.verify;
19 import static java.util.Objects.requireNonNull;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.nio.file.Path;
25 import org.checkerframework.checker.nullness.qual.NonNull;
26
27 /**
28  * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
29  */
30 final class DiskFileReader extends FileReader {
31     private final FileChannel channel;
32     private final ByteBuffer buffer;
33
34     // tracks where memory's first available byte maps to in terms of FileChannel.position()
35     private int bufferPosition;
36
37     DiskFileReader(final Path path, final FileChannel channel, final int maxEntrySize) {
38         super(path);
39         this.channel = requireNonNull(channel);
40         buffer = ByteBuffer.allocate(chooseBufferSize(maxEntrySize)).flip();
41         bufferPosition = 0;
42     }
43
44     private static int chooseBufferSize(final int maxEntrySize) {
45         return (maxEntrySize + SegmentEntry.HEADER_BYTES) * 2;
46     }
47
48     @Override
49     void invalidateCache() {
50         buffer.clear().flip();
51         bufferPosition = 0;
52     }
53
54     @Override
55     ByteBuffer read(final int position, final int size) {
56         // calculate logical seek distance between buffer's first byte and position and split flow between
57         // forward-moving and backwards-moving code paths.
58         final int seek = bufferPosition - position;
59         return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
60     }
61
62     private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
63         final int missing = buffer.limit() - seek - size;
64         if (missing <= 0) {
65             // fast path: we have the requested region
66             return buffer.slice(seek, size).asReadOnlyBuffer();
67         }
68
69         // We need to read more data, but let's salvage what we can:
70         // - set buffer position to seek, which means it points to the same as position
71         // - run compact, which moves everything between position and limit onto the beginning of buffer and
72         //   sets it up to receive more bytes
73         // - start the read accounting for the seek
74         buffer.position(seek).compact();
75         readAtLeast(position + seek, missing);
76         return setAndSlice(position, size);
77     }
78
79     private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
80         // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
81         //       do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
82         //       read it.
83         buffer.clear();
84         readAtLeast(position, size);
85         return setAndSlice(position, size);
86     }
87
88     private void readAtLeast(final int readPosition, final int readAtLeast) {
89         final int bytesRead;
90         try {
91             bytesRead = channel.read(buffer, readPosition);
92         } catch (IOException e) {
93             throw new StorageException(e);
94         }
95         verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast);
96         buffer.flip();
97     }
98
99     private @NonNull ByteBuffer setAndSlice(final int position, final int size) {
100         bufferPosition = position;
101         return buffer.slice(0, size).asReadOnlyBuffer();
102     }
103 }