Free disk buffers
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskFileReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Verify.verify;
19 import static java.util.Objects.requireNonNull;
20
21 import io.netty.buffer.ByteBuf;
22 import java.io.IOException;
23 import java.nio.channels.FileChannel;
24 import org.eclipse.jdt.annotation.NonNull;
25
26 /**
27  * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer.
28  */
29 final class DiskFileReader extends FileReader {
30     private final FileChannel channel;
31
32     private ByteBuf buffer;
33     // tracks where memory's first available byte maps to in terms of FileChannel.position()
34     private int bufferPosition;
35
36     // Note: take ownership of the buffer
37     DiskFileReader(final JournalSegmentFile file, final ByteBuf buffer) {
38         super(file);
39         this.buffer = requireNonNull(buffer);
40         channel = file.channel();
41         bufferPosition = 0;
42     }
43
44     @Override
45     void invalidateCache() {
46         buffer.clear();
47         bufferPosition = 0;
48     }
49
50     @Override
51     ByteBuf read(final int position, final int size) {
52         // calculate logical seek distance between buffer's first byte and position and split flow between
53         // forward-moving and backwards-moving code paths.
54         final int seek = bufferPosition - position;
55         return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
56     }
57
58     @Override
59     void release() {
60         final var local = buffer;
61         if (local != null) {
62             buffer = null;
63             local.release();
64         }
65     }
66
67     private @NonNull ByteBuf forwardAndRead(final int seek, final int position, final int size) {
68         final int remaining = buffer.writerIndex() - seek;
69         final int missing = remaining - size;
70         if (missing <= 0) {
71             // fast path: we have the requested region
72             return buffer.slice(seek, size).asReadOnly();
73         }
74
75         // We need to read more data, but let's salvage what we can:
76         // - set buffer position to seek, which means it points to the same as position
77         // - run compact, which moves everything between position and limit onto the beginning of buffer and
78         //   sets it up to receive more bytes
79         // - start the read accounting for the seek
80         buffer.writeBytes(buffer, seek, remaining);
81         readAtLeast(position + seek, missing);
82         return setAndSlice(position, size);
83     }
84
85     private @NonNull ByteBuf rewindAndRead(final int rewindBy, final int position, final int size) {
86         // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
87         //       do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
88         //       read it.
89         buffer.clear();
90         readAtLeast(position, size);
91         return setAndSlice(position, size);
92     }
93
94     private void readAtLeast(final int readPosition, final int readAtLeast) {
95         final int bytesRead;
96         try {
97             bytesRead = buffer.writeBytes(channel, readPosition, readAtLeast);
98         } catch (IOException e) {
99             throw new StorageException(e);
100         }
101         verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast);
102     }
103
104     private @NonNull ByteBuf setAndSlice(final int position, final int size) {
105         bufferPosition = position;
106         return buffer.slice(0, size).asReadOnly();
107     }
108 }