2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static java.util.Objects.requireNonNull;
20 import io.atomix.storage.journal.index.JournalIndex;
21 import io.atomix.storage.journal.index.Position;
22 import java.util.NoSuchElementException;
23 import org.eclipse.jdt.annotation.Nullable;
25 abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader, MappedJournalSegmentReader {
26 final int maxEntrySize;
27 private final JournalIndex index;
28 final JournalSerdes namespace;
29 private final long firstIndex;
30 private final JournalSegment<E> segment;
32 private Indexed<E> currentEntry;
33 private Indexed<E> nextEntry;
35 JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
36 final JournalSerdes namespace) {
37 this.segment = requireNonNull(segment);
38 this.maxEntrySize = maxEntrySize;
39 this.index = requireNonNull(index);
40 this.namespace = requireNonNull(namespace);
41 firstIndex = segment.index();
45 * Returns the last read entry.
47 * @return The last read entry.
49 final Indexed<E> getCurrentEntry() {
54 * Returns the next reader index.
56 * @return The next reader index.
58 final long getNextIndex() {
59 return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
63 * Returns whether the reader has a next entry to read.
65 * @return Whether the reader has a next entry to read.
67 final boolean hasNext() {
68 return nextEntry != null || (nextEntry = readNext()) != null;
72 * Returns the next entry in the reader.
74 * @return The next entry in the reader.
75 * @throws UnsupportedOperationException if there is no such entry
77 final Indexed<E> next() {
79 throw new NoSuchElementException();
82 // Set the current entry to the next entry.
83 currentEntry = nextEntry;
85 // Reset the next entry to null.
88 // Read the next entry in the segment.
89 nextEntry = readNext();
91 // Return the current entry.
96 * Resets the reader to the start of the segment.
101 setPosition(JournalSegmentDescriptor.BYTES);
102 nextEntry = readNext();
106 * Resets the reader to the given index.
108 * @param index The index to which to reset the reader.
110 final void reset(final long index) {
112 Position position = this.index.lookup(index - 1);
113 if (position != null) {
114 // FIXME: why do we need a 'null'-based entry here?
115 currentEntry = new Indexed<>(position.index() - 1, null, 0);
116 setPosition(position.position());
117 nextEntry = readNext();
119 while (getNextIndex() < index && hasNext()) {
128 segment.closeReader(this);
132 * Set the file position.
134 * @param position new position
136 abstract void setPosition(int position);
139 * Reads the entry at specified index.
141 * @param index entry index
142 * @return The entry, or {@code null}
144 abstract @Nullable Indexed<E> readEntry(long index);
146 private @Nullable Indexed<E> readNext() {
147 // Compute the index of the next entry in the segment.
148 return readEntry(getNextIndex());