Move entry tracking to SegmentedJournalReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 /**
22  * A {@link JournalReader} traversing all entries.
23  */
24 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
25     final SegmentedJournal<E> journal;
26
27     private JournalSegment<E> currentSegment;
28     private JournalSegmentReader<E> currentReader;
29     private Indexed<E> currentEntry;
30     private long nextIndex;
31
32     SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
33         this.journal = requireNonNull(journal);
34         currentSegment = requireNonNull(segment);
35         currentReader = segment.createReader();
36         nextIndex = currentSegment.index();
37         currentEntry = null;
38     }
39
40     @Override
41     public final long getFirstIndex() {
42         return journal.getFirstSegment().index();
43     }
44
45     @Override
46     public final long getCurrentIndex() {
47         return currentEntry != null ? currentEntry.index() : 0;
48     }
49
50     @Override
51     public final Indexed<E> getCurrentEntry() {
52         return currentEntry;
53     }
54
55     @Override
56     public final long getNextIndex() {
57         return nextIndex;
58     }
59
60     @Override
61     public final void reset() {
62         currentReader.close();
63
64         currentSegment = journal.getFirstSegment();
65         currentReader = currentSegment.createReader();
66         nextIndex = currentSegment.index();
67         currentEntry = null;
68     }
69
70     @Override
71     public final void reset(final long index) {
72         // If the current segment is not open, it has been replaced. Reset the segments.
73         if (!currentSegment.isOpen()) {
74             reset();
75         }
76
77         if (index < nextIndex) {
78             rewind(index);
79         } else if (index > nextIndex) {
80             while (index > nextIndex && tryNext() != null) {
81                 // Nothing else
82             }
83         } else {
84             resetCurrentReader(index);
85         }
86     }
87
88     private void resetCurrentReader(final long index) {
89         final var position = currentSegment.lookup(index - 1);
90         if (position != null) {
91             nextIndex = position.index();
92             currentReader.setPosition(position.position());
93         } else {
94             nextIndex = currentSegment.index();
95             currentReader.setPosition(JournalSegmentDescriptor.BYTES);
96         }
97         while (nextIndex < index && tryNext() != null) {
98             // Nothing else
99         }
100     }
101
102     /**
103      * Rewinds the journal to the given index.
104      */
105     private void rewind(final long index) {
106         if (currentSegment.index() >= index) {
107             JournalSegment<E> segment = journal.getSegment(index - 1);
108             if (segment != null) {
109                 currentReader.close();
110
111                 currentSegment = segment;
112                 currentReader = currentSegment.createReader();
113             }
114         }
115
116         resetCurrentReader(index);
117     }
118
119     @Override
120     public Indexed<E> tryNext() {
121         var next = currentReader.readEntry(nextIndex);
122         if (next == null) {
123             final var nextSegment = journal.getNextSegment(currentSegment.index());
124             if (nextSegment == null || nextSegment.index() != nextIndex) {
125                 return null;
126             }
127
128             currentReader.close();
129
130             currentSegment = nextSegment;
131             currentReader = currentSegment.createReader();
132             next = currentReader.readEntry(nextIndex);
133             if (next == null) {
134                 return null;
135             }
136         }
137
138         nextIndex = nextIndex + 1;
139         currentEntry = next;
140         return next;
141     }
142
143     @Override
144     public final void close() {
145         currentReader.close();
146         journal.closeReader(this);
147     }
148 }