Remove JournalReader.getCurrentEntry()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 /**
22  * A {@link JournalReader} traversing all entries.
23  */
24 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
25     final SegmentedJournal<E> journal;
26
27     private JournalSegment currentSegment;
28     private JournalSegmentReader currentReader;
29     private long nextIndex;
30
31     SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
32         this.journal = requireNonNull(journal);
33         currentSegment = requireNonNull(segment);
34         currentReader = segment.createReader();
35         nextIndex = currentSegment.firstIndex();
36     }
37
38     @Override
39     public final long getFirstIndex() {
40         return journal.getFirstSegment().firstIndex();
41     }
42
43     @Override
44     public final long getNextIndex() {
45         return nextIndex;
46     }
47
48     @Override
49     public final void reset() {
50         currentReader.close();
51
52         currentSegment = journal.getFirstSegment();
53         currentReader = currentSegment.createReader();
54         nextIndex = currentSegment.firstIndex();
55     }
56
57     @Override
58     public final void reset(final long index) {
59         // If the current segment is not open, it has been replaced. Reset the segments.
60         if (!currentSegment.isOpen()) {
61             reset();
62         }
63
64         if (index < nextIndex) {
65             rewind(index);
66         } else if (index > nextIndex) {
67             while (index > nextIndex && tryNext() != null) {
68                 // Nothing else
69             }
70         } else {
71             resetCurrentReader(index);
72         }
73     }
74
75     private void resetCurrentReader(final long index) {
76         final var position = currentSegment.lookup(index - 1);
77         if (position != null) {
78             nextIndex = position.index();
79             currentReader.setPosition(position.position());
80         } else {
81             nextIndex = currentSegment.firstIndex();
82             currentReader.setPosition(JournalSegmentDescriptor.BYTES);
83         }
84         while (nextIndex < index && tryNext() != null) {
85             // Nothing else
86         }
87     }
88
89     /**
90      * Rewinds the journal to the given index.
91      */
92     private void rewind(final long index) {
93         if (currentSegment.firstIndex() >= index) {
94             JournalSegment segment = journal.getSegment(index - 1);
95             if (segment != null) {
96                 currentReader.close();
97
98                 currentSegment = segment;
99                 currentReader = currentSegment.createReader();
100             }
101         }
102
103         resetCurrentReader(index);
104     }
105
106     @Override
107     public Indexed<E> tryNext() {
108         final var index = nextIndex;
109         var buf = currentReader.readBytes(index);
110         if (buf == null) {
111             final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
112             if (nextSegment == null || nextSegment.firstIndex() != index) {
113                 return null;
114             }
115
116             currentReader.close();
117
118             currentSegment = nextSegment;
119             currentReader = currentSegment.createReader();
120             buf = currentReader.readBytes(index);
121             if (buf == null) {
122                 return null;
123             }
124         }
125
126         final var entry = journal.serializer().deserialize(buf);
127         nextIndex = index + 1;
128         return new Indexed<>(index, entry, buf.readableBytes());
129     }
130
131     @Override
132     public final void close() {
133         currentReader.close();
134         journal.closeReader(this);
135     }
136 }