2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static com.google.common.base.Verify.verify;
19 import static java.util.Objects.requireNonNull;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.nio.file.Path;
25 import org.eclipse.jdt.annotation.NonNull;
28 * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
30 final class DiskFileReader extends FileReader {
32 * Just do not bother with IO smaller than this many bytes.
34 private static final int MIN_IO_SIZE = 8192;
36 private final FileChannel channel;
37 private final ByteBuffer buffer;
39 // tracks where memory's first available byte maps to in terms of FileChannel.position()
40 private int bufferPosition;
42 DiskFileReader(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
43 this(path, channel, allocateBuffer(maxSegmentSize, maxEntrySize));
46 // Note: take ownership of the buffer
47 DiskFileReader(final Path path, final FileChannel channel, final ByteBuffer buffer) {
49 this.channel = requireNonNull(channel);
50 this.buffer = buffer.flip();
54 static ByteBuffer allocateBuffer(final int maxSegmentSize, final int maxEntrySize) {
55 return ByteBuffer.allocate(chooseBufferSize(maxSegmentSize, maxEntrySize));
58 private static int chooseBufferSize(final int maxSegmentSize, final int maxEntrySize) {
59 if (maxSegmentSize <= MIN_IO_SIZE) {
60 // just buffer the entire segment
61 return maxSegmentSize;
64 // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries
65 final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES;
66 return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize;
70 void invalidateCache() {
71 buffer.clear().flip();
76 ByteBuffer read(final int position, final int size) {
77 // calculate logical seek distance between buffer's first byte and position and split flow between
78 // forward-moving and backwards-moving code paths.
79 final int seek = bufferPosition - position;
80 return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
83 private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
84 final int missing = buffer.limit() - seek - size;
86 // fast path: we have the requested region
87 return buffer.slice(seek, size).asReadOnlyBuffer();
90 // We need to read more data, but let's salvage what we can:
91 // - set buffer position to seek, which means it points to the same as position
92 // - run compact, which moves everything between position and limit onto the beginning of buffer and
93 // sets it up to receive more bytes
94 // - start the read accounting for the seek
95 buffer.position(seek).compact();
96 readAtLeast(position + seek, missing);
97 return setAndSlice(position, size);
100 private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
101 // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
102 // do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
105 readAtLeast(position, size);
106 return setAndSlice(position, size);
109 private void readAtLeast(final int readPosition, final int readAtLeast) {
112 bytesRead = channel.read(buffer, readPosition);
113 } catch (IOException e) {
114 throw new StorageException(e);
116 verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast);
120 private @NonNull ByteBuffer setAndSlice(final int position, final int size) {
121 bufferPosition = position;
122 return buffer.slice(0, size).asReadOnlyBuffer();