Separate out RaftEntryMeta
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Verify.verify;
19 import static java.util.Objects.requireNonNull;
20
21 import io.netty.buffer.ByteBuf;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 final class JournalSegmentReader {
27     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
28
29     private final JournalSegment segment;
30     private final int maxSegmentSize;
31     private final int maxEntrySize;
32
33     private FileReader fileReader;
34     private int position;
35
36     JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
37         this.segment = requireNonNull(segment);
38         this.fileReader = requireNonNull(fileReader);
39         maxSegmentSize = segment.file().maxSize();
40         this.maxEntrySize = maxEntrySize;
41     }
42
43     /**
44      * Return the current position.
45      *
46      * @return current position.
47      */
48     int position() {
49         return position;
50     }
51
52     /**
53      * Set the file position.
54      *
55      * @param position new position
56      */
57     void setPosition(final int position) {
58         verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
59             "Invalid position %s", position);
60         this.position = position;
61         fileReader.invalidateCache();
62     }
63
64     /**
65      * Invalidate any cache that is present, so that the next read is coherent with the backing file.
66      */
67     void invalidateCache() {
68         fileReader.invalidateCache();
69     }
70
71     /**
72      * Reads the next binary data block.
73      *
74      * @return The binary data, or {@code null}
75      */
76     @Nullable ByteBuf readBytes() {
77         // Check if there is enough in the buffer remaining
78         final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
79         if (remaining < 0) {
80             // Not enough space in the segment, there can never be another entry
81             return null;
82         }
83
84         // Calculate maximum entry length not exceeding file size nor maxEntrySize
85         final var maxLength = Math.min(remaining, maxEntrySize);
86         final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
87
88         // Read the entry length
89         final var length = buffer.getInt(0);
90         if (length < 1 || length > maxLength) {
91             // Invalid length, make sure next read re-tries
92             invalidateCache();
93             return null;
94         }
95
96         // Read the entry checksum
97         final int checksum = buffer.getInt(Integer.BYTES);
98
99         // Slice off the entry's bytes
100         final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
101         // If the stored checksum does not equal the computed checksum, do not proceed further
102         final var computed = SegmentEntry.computeChecksum(entryBuffer.nioBuffer());
103         if (checksum != computed) {
104             LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
105             invalidateCache();
106             return null;
107         }
108
109         // update position and return
110         position += SegmentEntry.HEADER_BYTES + length;
111         return entryBuffer;
112     }
113
114     /**
115      * Close this reader.
116      */
117     void close() {
118         final var local = fileReader;
119         if (local != null) {
120             fileReader = null;
121             local.release();
122             segment.closeReader(this);
123         }
124     }
125 }