JournalReader is not an Iterator
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static java.util.Objects.requireNonNull;
19
20 /**
21  * A {@link JournalReader} traversing all entries.
22  */
23 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
24   final SegmentedJournal<E> journal;
25   private JournalSegment<E> currentSegment;
26   private Indexed<E> previousEntry;
27   private JournalSegmentReader<E> currentReader;
28
29   SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
30     this.journal = requireNonNull(journal);
31     currentSegment = requireNonNull(segment);
32     currentReader = segment.createReader();
33   }
34
35   @Override
36   public final long getFirstIndex() {
37     return journal.getFirstSegment().index();
38   }
39
40   @Override
41   public final long getCurrentIndex() {
42     final var currentEntry = currentReader.getCurrentEntry();
43     if (currentEntry != null) {
44       final long currentIndex = currentEntry.index();
45       if (currentIndex != 0) {
46         return currentIndex;
47       }
48     }
49     return previousEntry != null ? previousEntry.index() : 0;
50   }
51
52   @Override
53   public final Indexed<E> getCurrentEntry() {
54     // If previousEntry was the last in the previous segment, we may have moved currentReader to the next segment.
55     // That segment may be empty, though, in which case we need to report the previousEntry.
56     final Indexed<E> currentEntry;
57     return (currentEntry = currentReader.getCurrentEntry()) != null ? currentEntry : previousEntry;
58   }
59
60   @Override
61   public final long getNextIndex() {
62     return currentReader.getNextIndex();
63   }
64
65   @Override
66   public final void reset() {
67     previousEntry = null;
68     currentReader.close();
69
70     currentSegment = journal.getFirstSegment();
71     currentReader = currentSegment.createReader();
72   }
73
74   @Override
75   public final void reset(long index) {
76     // If the current segment is not open, it has been replaced. Reset the segments.
77     if (!currentSegment.isOpen()) {
78       reset();
79     }
80
81     final var nextIndex = currentReader.getNextIndex();
82     if (index < nextIndex) {
83       rewind(index);
84     } else if (index > nextIndex) {
85       forward(index);
86     } else {
87       currentReader.reset(index);
88     }
89   }
90
91   /**
92    * Rewinds the journal to the given index.
93    */
94   private void rewind(long index) {
95     if (currentSegment.index() >= index) {
96       JournalSegment<E> segment = journal.getSegment(index - 1);
97       if (segment != null) {
98         currentReader.close();
99
100         currentSegment = segment;
101         currentReader = currentSegment.createReader();
102       }
103     }
104
105     currentReader.reset(index);
106     previousEntry = currentReader.getCurrentEntry();
107   }
108
109   /**
110    * Fast forwards the journal to the given index.
111    */
112   private void forward(long index) {
113     while (getNextIndex() < index && tryNext() != null) {
114       // Nothing else
115     }
116   }
117
118   @Override
119   public Indexed<E> tryNext() {
120     if (currentReader.hasNext()) {
121       previousEntry = currentReader.getCurrentEntry();
122       return currentReader.next();
123     }
124
125     final var nextSegment = journal.getNextSegment(currentSegment.index());
126     if (nextSegment == null || nextSegment.index() != getNextIndex()) {
127       return null;
128     }
129
130     previousEntry = currentReader.getCurrentEntry();
131     currentReader.close();
132
133     currentSegment = nextSegment;
134     currentReader = currentSegment.createReader();
135     return currentReader.hasNext() ? currentReader.next() : null;
136   }
137
138   @Override
139   public final void close() {
140     currentReader.close();
141     journal.closeReader(this);
142   }
143 }