JournalIndex.truncate() should return last entry
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 /**
22  * A {@link JournalReader} traversing all entries.
23  */
24 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
25   final SegmentedJournal<E> journal;
26   private JournalSegment<E> currentSegment;
27   private Indexed<E> previousEntry;
28   private JournalSegmentReader<E> currentReader;
29
30   SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
31     this.journal = requireNonNull(journal);
32     currentSegment = requireNonNull(segment);
33     currentReader = segment.createReader();
34   }
35
36   @Override
37   public final long getFirstIndex() {
38     return journal.getFirstSegment().index();
39   }
40
41   @Override
42   public final long getCurrentIndex() {
43     final var currentEntry = currentReader.getCurrentEntry();
44     if (currentEntry != null) {
45       final long currentIndex = currentEntry.index();
46       if (currentIndex != 0) {
47         return currentIndex;
48       }
49     }
50     return previousEntry != null ? previousEntry.index() : 0;
51   }
52
53   @Override
54   public final Indexed<E> getCurrentEntry() {
55     // If previousEntry was the last in the previous segment, we may have moved currentReader to the next segment.
56     // That segment may be empty, though, in which case we need to report the previousEntry.
57     final Indexed<E> currentEntry;
58     return (currentEntry = currentReader.getCurrentEntry()) != null ? currentEntry : previousEntry;
59   }
60
61   @Override
62   public final long getNextIndex() {
63     return currentReader.getNextIndex();
64   }
65
66   @Override
67   public final void reset() {
68     previousEntry = null;
69     currentReader.close();
70
71     currentSegment = journal.getFirstSegment();
72     currentReader = currentSegment.createReader();
73   }
74
75   @Override
76   public final void reset(long index) {
77     // If the current segment is not open, it has been replaced. Reset the segments.
78     if (!currentSegment.isOpen()) {
79       reset();
80     }
81
82     final var nextIndex = currentReader.getNextIndex();
83     if (index < nextIndex) {
84       rewind(index);
85     } else if (index > nextIndex) {
86       forward(index);
87     } else {
88       currentReader.reset(index);
89     }
90   }
91
92   /**
93    * Rewinds the journal to the given index.
94    */
95   private void rewind(long index) {
96     if (currentSegment.index() >= index) {
97       JournalSegment<E> segment = journal.getSegment(index - 1);
98       if (segment != null) {
99         currentReader.close();
100
101         currentSegment = segment;
102         currentReader = currentSegment.createReader();
103       }
104     }
105
106     currentReader.reset(index);
107     previousEntry = currentReader.getCurrentEntry();
108   }
109
110   /**
111    * Fast forwards the journal to the given index.
112    */
113   private void forward(long index) {
114     while (getNextIndex() < index && tryNext() != null) {
115       // Nothing else
116     }
117   }
118
119   @Override
120   public Indexed<E> tryNext() {
121     if (currentReader.hasNext()) {
122       previousEntry = currentReader.getCurrentEntry();
123       return currentReader.next();
124     }
125
126     final var nextSegment = journal.getNextSegment(currentSegment.index());
127     if (nextSegment == null || nextSegment.index() != getNextIndex()) {
128       return null;
129     }
130
131     previousEntry = currentReader.getCurrentEntry();
132     currentReader.close();
133
134     currentSegment = nextSegment;
135     currentReader = currentSegment.createReader();
136     return currentReader.hasNext() ? currentReader.next() : null;
137   }
138
139   @Override
140   public final void close() {
141     currentReader.close();
142     journal.closeReader(this);
143   }
144 }