2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static java.util.Objects.requireNonNull;
20 import io.atomix.storage.journal.index.JournalIndex;
21 import io.atomix.storage.journal.index.Position;
22 import java.util.NoSuchElementException;
23 import org.eclipse.jdt.annotation.Nullable;
25 abstract sealed class JournalSegmentReader<E> permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
26 final int maxEntrySize;
27 private final JournalIndex index;
28 final JournalSerdes namespace;
29 private final long firstIndex;
30 private final JournalSegment<E> segment;
32 private Indexed<E> currentEntry;
33 private Indexed<E> nextEntry;
35 JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
36 final JournalSerdes namespace) {
37 this.segment = requireNonNull(segment);
38 this.maxEntrySize = maxEntrySize;
39 this.index = requireNonNull(index);
40 this.namespace = requireNonNull(namespace);
41 firstIndex = segment.index();
45 * Returns the current reader index.
47 * @return The current reader index.
49 final long getCurrentIndex() {
50 return currentEntry != null ? currentEntry.index() : 0;
54 * Returns the last read entry.
56 * @return The last read entry.
58 final Indexed<E> getCurrentEntry() {
63 * Returns the next reader index.
65 * @return The next reader index.
67 final long getNextIndex() {
68 return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
72 * Returns whether the reader has a next entry to read.
74 * @return Whether the reader has a next entry to read.
76 final boolean hasNext() {
77 return nextEntry != null || (nextEntry = readNext()) != null;
81 * Returns the next entry in the reader.
83 * @return The next entry in the reader.
84 * @throws UnsupportedOperationException if there is no such entry
86 final Indexed<E> next() {
88 throw new NoSuchElementException();
91 // Set the current entry to the next entry.
92 currentEntry = nextEntry;
94 // Reset the next entry to null.
97 // Read the next entry in the segment.
98 nextEntry = readNext();
100 // Return the current entry.
105 * Resets the reader to the start of the segment.
110 setPosition(JournalSegmentDescriptor.BYTES);
111 nextEntry = readNext();
115 * Resets the reader to the given index.
117 * @param index The index to which to reset the reader.
119 final void reset(final long index) {
121 Position position = this.index.lookup(index - 1);
122 if (position != null) {
123 // FIXME: why do we need a 'null'-based entry here?
124 currentEntry = new Indexed<>(position.index() - 1, null, 0);
125 setPosition(position.position());
126 nextEntry = readNext();
128 while (getNextIndex() < index && hasNext()) {
137 segment.closeReader(this);
141 * Set the file position.
143 * @param position new position
145 abstract void setPosition(int position);
148 * Reads the entry at specified index.
150 * @param index entry index
151 * @return The entry, or {@code null}
153 abstract @Nullable Indexed<E> readEntry(long index);
155 private @Nullable Indexed<E> readNext() {
156 // Compute the index of the next entry in the segment.
157 return readEntry(getNextIndex());