Improve segmented journal actor metrics
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Verify.verify;
19 import static java.util.Objects.requireNonNull;
20
21 import io.netty.buffer.ByteBuf;
22 import io.netty.buffer.Unpooled;
23 import java.util.zip.CRC32;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 final class JournalSegmentReader {
29     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
30
31     private final JournalSegment segment;
32     private final FileReader fileReader;
33     private final int maxSegmentSize;
34     private final int maxEntrySize;
35
36     private int position;
37
38     JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
39         this.segment = requireNonNull(segment);
40         this.fileReader = requireNonNull(fileReader);
41         maxSegmentSize = segment.descriptor().maxSegmentSize();
42         this.maxEntrySize = maxEntrySize;
43     }
44
45     /**
46      * Return the current position.
47      *
48      * @return current position.
49      */
50     int position() {
51         return position;
52     }
53
54     /**
55      * Set the file position.
56      *
57      * @param position new position
58      */
59     void setPosition(final int position) {
60         verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
61             "Invalid position %s", position);
62         this.position = position;
63         fileReader.invalidateCache();
64     }
65
66     /**
67      * Invalidate any cache that is present, so that the next read is coherent with the backing file.
68      */
69     void invalidateCache() {
70         fileReader.invalidateCache();
71     }
72
73     /**
74      * Reads the next binary data block
75      *
76      * @param index entry index
77      * @return The binary data, or {@code null}
78      */
79     @Nullable ByteBuf readBytes(final long index) {
80         // Check if there is enough in the buffer remaining
81         final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
82         if (remaining < 0) {
83             // Not enough space in the segment, there can never be another entry
84             return null;
85         }
86
87         // Calculate maximum entry length not exceeding file size nor maxEntrySize
88         final var maxLength = Math.min(remaining, maxEntrySize);
89         final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
90
91         // Read the entry length
92         final var length = buffer.getInt(0);
93         if (length < 1 || length > maxLength) {
94             // Invalid length, make sure next read re-tries
95             invalidateCache();
96             return null;
97         }
98
99         // Read the entry checksum
100         final int checksum = buffer.getInt(Integer.BYTES);
101
102         // Slice off the entry's bytes
103         final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
104         // Compute the checksum for the entry bytes.
105         final var crc32 = new CRC32();
106         crc32.update(entryBuffer);
107
108         // If the stored checksum does not equal the computed checksum, do not proceed further
109         final var computed = (int) crc32.getValue();
110         if (checksum != computed) {
111             LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
112             invalidateCache();
113             return null;
114         }
115
116         // update position
117         position += SegmentEntry.HEADER_BYTES + length;
118
119         // return bytes
120         entryBuffer.rewind();
121         return Unpooled.buffer(length).writeBytes(entryBuffer);
122     }
123
124     /**
125      * Close this reader.
126      */
127     void close() {
128         segment.closeReader(this);
129     }
130 }