2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static com.google.common.base.Verify.verify;
19 import static java.util.Objects.requireNonNull;
21 import com.esotericsoftware.kryo.KryoException;
22 import java.util.zip.CRC32;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 final class JournalSegmentReader<E> {
28 private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
30 private final JournalSegment<E> segment;
31 private final JournalSerdes namespace;
32 private final FileReader fileReader;
33 private final int maxSegmentSize;
34 private final int maxEntrySize;
38 JournalSegmentReader(final JournalSegment<E> segment, final FileReader fileReader,
39 final int maxEntrySize, final JournalSerdes namespace) {
40 this.segment = requireNonNull(segment);
41 this.fileReader = requireNonNull(fileReader);
42 maxSegmentSize = segment.descriptor().maxSegmentSize();
43 this.maxEntrySize = maxEntrySize;
44 this.namespace = requireNonNull(namespace);
48 * Return the current position.
50 * @return current position.
57 * Set the file position.
59 * @param position new position
61 void setPosition(final int position) {
62 verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
63 "Invalid position %s", position);
64 this.position = position;
65 fileReader.invalidateCache();
69 * Invalidate any cache that is present, so that the next read is coherent with the backing file.
71 void invalidateCache() {
72 fileReader.invalidateCache();
76 * Reads the next entry, assigning it specified index.
78 * @param index entry index
79 * @return The entry, or {@code null}
81 @Nullable Indexed<E> readEntry(final long index) {
82 // Check if there is enough in the buffer remaining
83 final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
85 // Not enough space in the segment, there can never be another entry
89 // Calculate maximum entry length not exceeding file size nor maxEntrySize
90 final var maxLength = Math.min(remaining, maxEntrySize);
91 final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
93 // Read the entry length
94 final var length = buffer.getInt(0);
95 if (length < 1 || length > maxLength) {
96 // Invalid length, make sure next read re-tries
101 // Read the entry checksum
102 final int checksum = buffer.getInt(Integer.BYTES);
104 // Slice off the entry's bytes
105 final var entryBytes = buffer.slice(SegmentEntry.HEADER_BYTES, length);
106 // Compute the checksum for the entry bytes.
107 final var crc32 = new CRC32();
108 crc32.update(entryBytes);
110 // If the stored checksum does not equal the computed checksum, do not proceed further
111 final var computed = (int) crc32.getValue();
112 if (checksum != computed) {
113 LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
118 // Attempt to deserialize
121 entry = namespace.deserialize(entryBytes.rewind());
122 } catch (KryoException e) {
123 // TODO: promote this to a hard error, as it should never happen
124 LOG.debug("Failed to deserialize entry", e);
129 // We are all set. Update the position.
130 position = position + SegmentEntry.HEADER_BYTES + length;
131 return new Indexed<>(index, entry, length);
138 segment.closeReader(this);