Refactor JournalReader.tryNext()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import org.eclipse.jdt.annotation.NonNull;
22
23 /**
24  * A {@link JournalReader} traversing all entries.
25  */
26 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
27     // Marker non-null object for tryAdvance()
28     private static final @NonNull Object ADVANCED = new Object();
29
30     final SegmentedJournal<E> journal;
31
32     private JournalSegment currentSegment;
33     private JournalSegmentReader currentReader;
34     private long nextIndex;
35
36     SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
37         this.journal = requireNonNull(journal);
38         currentSegment = requireNonNull(segment);
39         currentReader = segment.createReader();
40         nextIndex = currentSegment.firstIndex();
41     }
42
43     @Override
44     public final long getFirstIndex() {
45         return journal.getFirstSegment().firstIndex();
46     }
47
48     @Override
49     public final long getNextIndex() {
50         return nextIndex;
51     }
52
53     @Override
54     public final void reset() {
55         currentReader.close();
56
57         currentSegment = journal.getFirstSegment();
58         currentReader = currentSegment.createReader();
59         nextIndex = currentSegment.firstIndex();
60     }
61
62     @Override
63     public final void reset(final long index) {
64         // If the current segment is not open, it has been replaced. Reset the segments.
65         if (!currentSegment.isOpen()) {
66             reset();
67         }
68
69         if (index < nextIndex) {
70             rewind(index);
71         } else if (index > nextIndex) {
72             while (index > nextIndex && tryAdvance()) {
73                 // Nothing else
74             }
75         } else {
76             resetCurrentReader(index);
77         }
78     }
79
80     private void resetCurrentReader(final long index) {
81         final var position = currentSegment.lookup(index - 1);
82         if (position != null) {
83             nextIndex = position.index();
84             currentReader.setPosition(position.position());
85         } else {
86             nextIndex = currentSegment.firstIndex();
87             currentReader.setPosition(JournalSegmentDescriptor.BYTES);
88         }
89         while (nextIndex < index && tryAdvance()) {
90             // Nothing else
91         }
92     }
93
94     /**
95      * Rewinds the journal to the given index.
96      */
97     private void rewind(final long index) {
98         if (currentSegment.firstIndex() >= index) {
99             JournalSegment segment = journal.getSegment(index - 1);
100             if (segment != null) {
101                 currentReader.close();
102
103                 currentSegment = segment;
104                 currentReader = currentSegment.createReader();
105             }
106         }
107
108         resetCurrentReader(index);
109     }
110
111     @Override
112     public <T> T tryNext(final EntryMapper<E, T> mapper) {
113         final var index = nextIndex;
114         var buf = currentReader.readBytes(index);
115         if (buf == null) {
116             final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
117             if (nextSegment == null || nextSegment.firstIndex() != index) {
118                 return null;
119             }
120
121             currentReader.close();
122
123             currentSegment = nextSegment;
124             currentReader = currentSegment.createReader();
125             buf = currentReader.readBytes(index);
126             if (buf == null) {
127                 return null;
128             }
129         }
130
131         final var entry = journal.serializer().deserialize(buf);
132         final var ret = requireNonNull(mapper.mapEntry(index, entry, buf.readableBytes()));
133         nextIndex = index + 1;
134         return ret;
135     }
136
137     /**
138      * Try to move to the next entry.
139      *
140      * @return {@code true} if there was a next entry and this reader has moved to it
141      */
142     final boolean tryAdvance() {
143         return tryNext((index, entry, size) -> ADVANCED) != null;
144     }
145
146     @Override
147     public final void close() {
148         currentReader.close();
149         journal.closeReader(this);
150     }
151 }