4d63eaf550edffbfd74020517f8d0214020fba46
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static java.util.Objects.requireNonNull;
19
20 import io.atomix.storage.journal.index.JournalIndex;
21 import io.atomix.storage.journal.index.Position;
22 import java.util.NoSuchElementException;
23 import org.eclipse.jdt.annotation.Nullable;
24
25 abstract sealed class JournalSegmentReader<E> permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
26     final int maxEntrySize;
27     private final JournalIndex index;
28     final JournalSerdes namespace;
29     private final long firstIndex;
30     private final JournalSegment<E> segment;
31
32     private Indexed<E> currentEntry;
33     private Indexed<E> nextEntry;
34
35     JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
36             final JournalSerdes namespace) {
37         this.segment = requireNonNull(segment);
38         this.maxEntrySize = maxEntrySize;
39         this.index = requireNonNull(index);
40         this.namespace = requireNonNull(namespace);
41         firstIndex = segment.index();
42     }
43
44     /**
45      * Returns the current reader index.
46      *
47      * @return The current reader index.
48      */
49     final long getCurrentIndex() {
50         return currentEntry != null ? currentEntry.index() : 0;
51     }
52
53     /**
54      * Returns the last read entry.
55      *
56      * @return The last read entry.
57      */
58     final Indexed<E> getCurrentEntry() {
59         return currentEntry;
60     }
61
62     /**
63      * Returns the next reader index.
64      *
65      * @return The next reader index.
66      */
67     final long getNextIndex() {
68         return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
69     }
70
71     /**
72      * Returns whether the reader has a next entry to read.
73      *
74      * @return Whether the reader has a next entry to read.
75      */
76     final boolean hasNext() {
77         return nextEntry != null || (nextEntry = readNext()) != null;
78     }
79
80     /**
81      * Returns the next entry in the reader.
82      *
83      * @return The next entry in the reader.
84      * @throws UnsupportedOperationException if there is no such entry
85      */
86     final Indexed<E> next() {
87         if (!hasNext()) {
88             throw new NoSuchElementException();
89         }
90
91         // Set the current entry to the next entry.
92         currentEntry = nextEntry;
93
94         // Reset the next entry to null.
95         nextEntry = null;
96
97         // Read the next entry in the segment.
98         nextEntry = readNext();
99
100         // Return the current entry.
101         return currentEntry;
102     }
103
104     /**
105      * Resets the reader to the start of the segment.
106      */
107     final void reset() {
108         currentEntry = null;
109         nextEntry = null;
110         setPosition(JournalSegmentDescriptor.BYTES);
111         nextEntry = readNext();
112     }
113
114     /**
115      * Resets the reader to the given index.
116      *
117      * @param index The index to which to reset the reader.
118      */
119     final void reset(final long index) {
120         reset();
121         Position position = this.index.lookup(index - 1);
122         if (position != null) {
123             // FIXME: why do we need a 'null'-based entry here?
124             currentEntry = new Indexed<>(position.index() - 1, null, 0);
125             setPosition(position.position());
126             nextEntry = readNext();
127         }
128         while (getNextIndex() < index && hasNext()) {
129             next();
130         }
131     }
132
133     /**
134      * Close this reader.
135      */
136     final void close() {
137         segment.closeReader(this);
138     }
139
140     /**
141      * Set the file position.
142      *
143      * @param position new position
144      */
145     abstract void setPosition(int position);
146
147     /**
148      * Reads the entry at specified index.
149      *
150      * @param index entry index
151      * @return The entry, or {@code null}
152      */
153     abstract @Nullable Indexed<E> readEntry(long index);
154
155     private @Nullable Indexed<E> readNext() {
156         // Compute the index of the next entry in the segment.
157         return readEntry(getNextIndex());
158     }
159 }