Optimize SegmentedJournalReader.getCurrentEntry()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static java.util.Objects.requireNonNull;
19
20 import java.util.NoSuchElementException;
21
22 /**
23  * A {@link JournalReader} traversing all entries.
24  */
25 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
26   final SegmentedJournal<E> journal;
27   private JournalSegment<E> currentSegment;
28   private Indexed<E> previousEntry;
29   private JournalSegmentReader<E> currentReader;
30
31   SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
32     this.journal = requireNonNull(journal);
33     currentSegment = requireNonNull(segment);
34     currentReader = segment.createReader();
35   }
36
37   @Override
38   public final long getFirstIndex() {
39     return journal.getFirstSegment().index();
40   }
41
42   @Override
43   public final long getCurrentIndex() {
44     long currentIndex = currentReader.getCurrentIndex();
45     if (currentIndex != 0) {
46       return currentIndex;
47     }
48     if (previousEntry != null) {
49       return previousEntry.index();
50     }
51     return 0;
52   }
53
54   @Override
55   public final Indexed<E> getCurrentEntry() {
56     // If previousEntry was the last in the previous segment, we may have moved currentReader to the next segment.
57     // That segment may be empty, though, in which case we need to report the previousEntry.
58     final Indexed<E> currentEntry;
59     return (currentEntry = currentReader.getCurrentEntry()) != null ? currentEntry : previousEntry;
60   }
61
62   @Override
63   public final long getNextIndex() {
64     return currentReader.getNextIndex();
65   }
66
67   @Override
68   public final void reset() {
69     previousEntry = null;
70     currentReader.close();
71
72     currentSegment = journal.getFirstSegment();
73     currentReader = currentSegment.createReader();
74   }
75
76   @Override
77   public final void reset(long index) {
78     // If the current segment is not open, it has been replaced. Reset the segments.
79     if (!currentSegment.isOpen()) {
80       reset();
81     }
82
83     if (index < currentReader.getNextIndex()) {
84       rewind(index);
85     } else if (index > currentReader.getNextIndex()) {
86       forward(index);
87     } else {
88       currentReader.reset(index);
89     }
90   }
91
92   /**
93    * Rewinds the journal to the given index.
94    */
95   private void rewind(long index) {
96     if (currentSegment.index() >= index) {
97       JournalSegment<E> segment = journal.getSegment(index - 1);
98       if (segment != null) {
99         currentReader.close();
100
101         currentSegment = segment;
102         currentReader = currentSegment.createReader();
103       }
104     }
105
106     currentReader.reset(index);
107     previousEntry = currentReader.getCurrentEntry();
108   }
109
110   /**
111    * Fast forwards the journal to the given index.
112    */
113   private void forward(long index) {
114     while (getNextIndex() < index && hasNext()) {
115       next();
116     }
117   }
118
119   @Override
120   public boolean hasNext() {
121     return currentReader.hasNext() || moveToNextSegment() && currentReader.hasNext();
122   }
123
124   @Override
125   public final Indexed<E> next() {
126     if (currentReader.hasNext()) {
127       previousEntry = currentReader.getCurrentEntry();
128       return currentReader.next();
129     }
130     if (moveToNextSegment()) {
131       return currentReader.next();
132     }
133     throw new NoSuchElementException();
134   }
135
136   @Override
137   public final void close() {
138     currentReader.close();
139     journal.closeReader(this);
140   }
141
142   private boolean moveToNextSegment() {
143     final var nextSegment = journal.getNextSegment(currentSegment.index());
144     if (nextSegment == null || nextSegment.index() != getNextIndex()) {
145       return false;
146     }
147
148     previousEntry = currentReader.getCurrentEntry();
149     currentReader.close();
150
151     currentSegment = nextSegment;
152     currentReader = currentSegment.createReader();
153     return true;
154   }
155 }