Optimize SegmentedJournalReader.hasNext()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static java.util.Objects.requireNonNull;
19
20 import java.util.NoSuchElementException;
21
22 /**
23  * A {@link JournalReader} traversing all entries.
24  */
25 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
26   final SegmentedJournal<E> journal;
27   private JournalSegment<E> currentSegment;
28   private Indexed<E> previousEntry;
29   private JournalSegmentReader<E> currentReader;
30
31   SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
32     this.journal = requireNonNull(journal);
33     currentSegment = requireNonNull(segment);
34     currentReader = segment.createReader();
35   }
36
37   @Override
38   public final long getFirstIndex() {
39     return journal.getFirstSegment().index();
40   }
41
42   @Override
43   public final long getCurrentIndex() {
44     long currentIndex = currentReader.getCurrentIndex();
45     if (currentIndex != 0) {
46       return currentIndex;
47     }
48     if (previousEntry != null) {
49       return previousEntry.index();
50     }
51     return 0;
52   }
53
54   @Override
55   public final Indexed<E> getCurrentEntry() {
56     Indexed<E> currentEntry = currentReader.getCurrentEntry();
57     if (currentEntry != null) {
58       return currentEntry;
59     }
60     return previousEntry;
61   }
62
63   @Override
64   public final long getNextIndex() {
65     return currentReader.getNextIndex();
66   }
67
68   @Override
69   public final void reset() {
70     previousEntry = null;
71     currentReader.close();
72
73     currentSegment = journal.getFirstSegment();
74     currentReader = currentSegment.createReader();
75   }
76
77   @Override
78   public final void reset(long index) {
79     // If the current segment is not open, it has been replaced. Reset the segments.
80     if (!currentSegment.isOpen()) {
81       reset();
82     }
83
84     if (index < currentReader.getNextIndex()) {
85       rewind(index);
86     } else if (index > currentReader.getNextIndex()) {
87       forward(index);
88     } else {
89       currentReader.reset(index);
90     }
91   }
92
93   /**
94    * Rewinds the journal to the given index.
95    */
96   private void rewind(long index) {
97     if (currentSegment.index() >= index) {
98       JournalSegment<E> segment = journal.getSegment(index - 1);
99       if (segment != null) {
100         currentReader.close();
101
102         currentSegment = segment;
103         currentReader = currentSegment.createReader();
104       }
105     }
106
107     currentReader.reset(index);
108     previousEntry = currentReader.getCurrentEntry();
109   }
110
111   /**
112    * Fast forwards the journal to the given index.
113    */
114   private void forward(long index) {
115     while (getNextIndex() < index && hasNext()) {
116       next();
117     }
118   }
119
120   @Override
121   public boolean hasNext() {
122     return currentReader.hasNext() || moveToNextSegment() && currentReader.hasNext();
123   }
124
125   @Override
126   public final Indexed<E> next() {
127     if (currentReader.hasNext()) {
128       previousEntry = currentReader.getCurrentEntry();
129       return currentReader.next();
130     }
131     if (moveToNextSegment()) {
132       return currentReader.next();
133     }
134     throw new NoSuchElementException();
135   }
136
137   @Override
138   public final void close() {
139     currentReader.close();
140     journal.closeReader(this);
141   }
142
143   private boolean moveToNextSegment() {
144     final var nextSegment = journal.getNextSegment(currentSegment.index());
145     if (nextSegment == null || nextSegment.index() != getNextIndex()) {
146       return false;
147     }
148
149     previousEntry = currentReader.getCurrentEntry();
150     currentReader.close();
151
152     currentSegment = nextSegment;
153     currentReader = currentSegment.createReader();
154     return true;
155   }
156 }