Factor out FileReader interface
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Verify.verify;
19 import static java.util.Objects.requireNonNull;
20
21 import com.esotericsoftware.kryo.KryoException;
22 import java.util.zip.CRC32;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 final class JournalSegmentReader<E> {
28     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
29
30     private final JournalSegment<E> segment;
31     private final JournalSerdes namespace;
32     private final FileReader fileReader;
33     private final int maxSegmentSize;
34     private final int maxEntrySize;
35
36     private int position;
37
38     JournalSegmentReader(final JournalSegment<E> segment, final FileReader fileReader,
39             final int maxEntrySize, final JournalSerdes namespace) {
40         this.segment = requireNonNull(segment);
41         this.fileReader = requireNonNull(fileReader);
42         maxSegmentSize = segment.descriptor().maxSegmentSize();
43         this.maxEntrySize = maxEntrySize;
44         this.namespace = requireNonNull(namespace);
45     }
46
47     /**
48      * Return the current position.
49      *
50      * @return current position.
51      */
52     int position() {
53         return position;
54     }
55
56     /**
57      * Set the file position.
58      *
59      * @param position new position
60      */
61     void setPosition(final int position) {
62         verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
63             "Invalid position %s", position);
64         this.position = position;
65         fileReader.invalidateCache();
66     }
67
68     /**
69      * Invalidate any cache that is present, so that the next read is coherent with the backing file.
70      */
71     void invalidateCache() {
72         fileReader.invalidateCache();
73     }
74
75     /**
76      * Reads the next entry, assigning it specified index.
77      *
78      * @param index entry index
79      * @return The entry, or {@code null}
80      */
81     @Nullable Indexed<E> readEntry(final long index) {
82         // Check if there is enough in the buffer remaining
83         final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
84         if (remaining < 0) {
85             // Not enough space in the segment, there can never be another entry
86             return null;
87         }
88
89         // Calculate maximum entry length not exceeding file size nor maxEntrySize
90         final var maxLength = Math.min(remaining, maxEntrySize);
91         final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
92
93         // Read the entry length
94         final var length = buffer.getInt(0);
95         if (length < 1 || length > maxLength) {
96             // Invalid length, make sure next read re-tries
97             invalidateCache();
98             return null;
99         }
100
101         // Read the entry checksum
102         final int checksum = buffer.getInt(Integer.BYTES);
103
104         // Slice off the entry's bytes
105         final var entryBytes = buffer.slice(SegmentEntry.HEADER_BYTES, length);
106         // Compute the checksum for the entry bytes.
107         final var crc32 = new CRC32();
108         crc32.update(entryBytes);
109
110         // If the stored checksum does not equal the computed checksum, do not proceed further
111         final var computed = (int) crc32.getValue();
112         if (checksum != computed) {
113             LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
114             invalidateCache();
115             return null;
116         }
117
118         // Attempt to deserialize
119         final E entry;
120         try {
121             entry = namespace.deserialize(entryBytes.rewind());
122         } catch (KryoException e) {
123             // TODO: promote this to a hard error, as it should never happen
124             LOG.debug("Failed to deserialize entry", e);
125             invalidateCache();
126             return null;
127         }
128
129         // We are all set. Update the position.
130         position = position + SegmentEntry.HEADER_BYTES + length;
131         return new Indexed<>(index, entry, length);
132     }
133
134     /**
135      * Close this reader.
136      */
137     void close() {
138         segment.closeReader(this);
139     }
140 }