Eliminate MappableJournalSegmentReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.util.NoSuchElementException;
19
20 /**
21  * Raft log reader.
22  */
23 public final class SegmentedJournalReader<E> implements JournalReader<E> {
24   private final SegmentedJournal<E> journal;
25   private JournalSegment<E> currentSegment;
26   private Indexed<E> previousEntry;
27   private JournalSegmentReader<E> currentReader;
28   private final Mode mode;
29
30   SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
31     this.journal = journal;
32     this.mode = mode;
33     currentSegment = journal.getSegment(index);
34     currentReader = currentSegment.createReader();
35
36     long nextIndex = getNextIndex();
37     while (index > nextIndex && hasNext()) {
38       next();
39       nextIndex = getNextIndex();
40     }
41   }
42
43   @Override
44   public long getFirstIndex() {
45     return journal.getFirstSegment().index();
46   }
47
48   @Override
49   public long getCurrentIndex() {
50     long currentIndex = currentReader.getCurrentIndex();
51     if (currentIndex != 0) {
52       return currentIndex;
53     }
54     if (previousEntry != null) {
55       return previousEntry.index();
56     }
57     return 0;
58   }
59
60   @Override
61   public Indexed<E> getCurrentEntry() {
62     Indexed<E> currentEntry = currentReader.getCurrentEntry();
63     if (currentEntry != null) {
64       return currentEntry;
65     }
66     return previousEntry;
67   }
68
69   @Override
70   public long getNextIndex() {
71     return currentReader.getNextIndex();
72   }
73
74   @Override
75   public void reset() {
76     previousEntry = null;
77     currentReader.close();
78
79     currentSegment = journal.getFirstSegment();
80     currentReader = currentSegment.createReader();
81   }
82
83   @Override
84   public void reset(long index) {
85     // If the current segment is not open, it has been replaced. Reset the segments.
86     if (!currentSegment.isOpen()) {
87       reset();
88     }
89
90     if (index < currentReader.getNextIndex()) {
91       rewind(index);
92     } else if (index > currentReader.getNextIndex()) {
93       forward(index);
94     } else {
95       currentReader.reset(index);
96     }
97   }
98
99   /**
100    * Rewinds the journal to the given index.
101    */
102   private void rewind(long index) {
103     if (currentSegment.index() >= index) {
104       JournalSegment<E> segment = journal.getSegment(index - 1);
105       if (segment != null) {
106         currentReader.close();
107
108         currentSegment = segment;
109         currentReader = currentSegment.createReader();
110       }
111     }
112
113     currentReader.reset(index);
114     previousEntry = currentReader.getCurrentEntry();
115   }
116
117   /**
118    * Fast forwards the journal to the given index.
119    */
120   private void forward(long index) {
121     while (getNextIndex() < index && hasNext()) {
122       next();
123     }
124   }
125
126   @Override
127   public boolean hasNext() {
128     if (mode == Mode.ALL) {
129       return hasNextEntry();
130     }
131
132     long nextIndex = getNextIndex();
133     long commitIndex = journal.getCommitIndex();
134     return nextIndex <= commitIndex && hasNextEntry();
135   }
136
137   private boolean hasNextEntry() {
138     if (currentReader.hasNext()) {
139       return true;
140     }
141     return moveToNextSegment() ? currentReader.hasNext() : false;
142   }
143
144   @Override
145   public Indexed<E> next() {
146     if (currentReader.hasNext()) {
147       previousEntry = currentReader.getCurrentEntry();
148       return currentReader.next();
149     }
150     if (moveToNextSegment()) {
151       return currentReader.next();
152     }
153     throw new NoSuchElementException();
154   }
155
156   @Override
157   public void close() {
158     currentReader.close();
159     journal.closeReader(this);
160   }
161
162   private boolean moveToNextSegment() {
163     final var nextSegment = journal.getNextSegment(currentSegment.index());
164     if (nextSegment == null || nextSegment.index() != getNextIndex()) {
165       return false;
166     }
167
168     previousEntry = currentReader.getCurrentEntry();
169     currentReader.close();
170
171     currentSegment = nextSegment;
172     currentReader = currentSegment.createReader();
173     return true;
174   }
175 }