2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static com.google.common.base.Verify.verify;
19 import static java.util.Objects.requireNonNull;
21 import io.netty.buffer.ByteBuf;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 final class JournalSegmentReader {
27 private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
29 private final JournalSegment segment;
30 private final int maxSegmentSize;
31 private final int maxEntrySize;
33 private FileReader fileReader;
36 JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
37 this.segment = requireNonNull(segment);
38 this.fileReader = requireNonNull(fileReader);
39 maxSegmentSize = segment.file().maxSize();
40 this.maxEntrySize = maxEntrySize;
44 * Return the current position.
46 * @return current position.
53 * Set the file position.
55 * @param position new position
57 void setPosition(final int position) {
58 verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
59 "Invalid position %s", position);
60 this.position = position;
61 fileReader.invalidateCache();
65 * Invalidate any cache that is present, so that the next read is coherent with the backing file.
67 void invalidateCache() {
68 fileReader.invalidateCache();
72 * Reads the next binary data block.
74 * @return The binary data, or {@code null}
76 @Nullable ByteBuf readBytes() {
77 // Check if there is enough in the buffer remaining
78 final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
80 // Not enough space in the segment, there can never be another entry
84 // Calculate maximum entry length not exceeding file size nor maxEntrySize
85 final var maxLength = Math.min(remaining, maxEntrySize);
86 final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
88 // Read the entry length
89 final var length = buffer.getInt(0);
90 if (length < 1 || length > maxLength) {
91 // Invalid length, make sure next read re-tries
96 // Read the entry checksum
97 final int checksum = buffer.getInt(Integer.BYTES);
99 // Slice off the entry's bytes
100 final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
101 // If the stored checksum does not equal the computed checksum, do not proceed further
102 final var computed = SegmentEntry.computeChecksum(entryBuffer.nioBuffer());
103 if (checksum != computed) {
104 LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
109 // update position and return
110 position += SegmentEntry.HEADER_BYTES + length;
118 final var local = fileReader;
122 segment.closeReader(this);