JournalIndex.truncate() should return last entry
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static java.util.Objects.requireNonNull;
19
20 import io.atomix.storage.journal.index.JournalIndex;
21 import io.atomix.storage.journal.index.Position;
22 import java.util.NoSuchElementException;
23 import org.eclipse.jdt.annotation.Nullable;
24
25 abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader, MappedJournalSegmentReader {
26     final int maxEntrySize;
27     private final JournalIndex index;
28     final JournalSerdes namespace;
29     private final long firstIndex;
30     private final JournalSegment<E> segment;
31
32     private Indexed<E> currentEntry;
33     private Indexed<E> nextEntry;
34
35     JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
36             final JournalSerdes namespace) {
37         this.segment = requireNonNull(segment);
38         this.maxEntrySize = maxEntrySize;
39         this.index = requireNonNull(index);
40         this.namespace = requireNonNull(namespace);
41         firstIndex = segment.index();
42     }
43
44     /**
45      * Returns the last read entry.
46      *
47      * @return The last read entry.
48      */
49     final Indexed<E> getCurrentEntry() {
50         return currentEntry;
51     }
52
53     /**
54      * Returns the next reader index.
55      *
56      * @return The next reader index.
57      */
58     final long getNextIndex() {
59         return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
60     }
61
62     /**
63      * Returns whether the reader has a next entry to read.
64      *
65      * @return Whether the reader has a next entry to read.
66      */
67     final boolean hasNext() {
68         return nextEntry != null || (nextEntry = readNext()) != null;
69     }
70
71     /**
72      * Returns the next entry in the reader.
73      *
74      * @return The next entry in the reader.
75      * @throws UnsupportedOperationException if there is no such entry
76      */
77     final Indexed<E> next() {
78         if (!hasNext()) {
79             throw new NoSuchElementException();
80         }
81
82         // Set the current entry to the next entry.
83         currentEntry = nextEntry;
84
85         // Reset the next entry to null.
86         nextEntry = null;
87
88         // Read the next entry in the segment.
89         nextEntry = readNext();
90
91         // Return the current entry.
92         return currentEntry;
93     }
94
95     /**
96      * Resets the reader to the start of the segment.
97      */
98     final void reset() {
99         currentEntry = null;
100         nextEntry = null;
101         setPosition(JournalSegmentDescriptor.BYTES);
102         nextEntry = readNext();
103     }
104
105     /**
106      * Resets the reader to the given index.
107      *
108      * @param index The index to which to reset the reader.
109      */
110     final void reset(final long index) {
111         reset();
112         Position position = this.index.lookup(index - 1);
113         if (position != null) {
114             // FIXME: why do we need a 'null'-based entry here?
115             currentEntry = new Indexed<>(position.index() - 1, null, 0);
116             setPosition(position.position());
117             nextEntry = readNext();
118         }
119         while (getNextIndex() < index && hasNext()) {
120             next();
121         }
122     }
123
124     /**
125      * Close this reader.
126      */
127     final void close() {
128         segment.closeReader(this);
129     }
130
131     /**
132      * Set the file position.
133      *
134      * @param position new position
135      */
136     abstract void setPosition(int position);
137
138     /**
139      * Reads the entry at specified index.
140      *
141      * @param index entry index
142      * @return The entry, or {@code null}
143      */
144     abstract @Nullable Indexed<E> readEntry(long index);
145
146     private @Nullable Indexed<E> readNext() {
147         // Compute the index of the next entry in the segment.
148         return readEntry(getNextIndex());
149     }
150 }