Hide JournalSegment
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.util.NoSuchElementException;
19
20 /**
21  * Raft log reader.
22  */
23 public final class SegmentedJournalReader<E> implements JournalReader<E> {
24   private final SegmentedJournal<E> journal;
25   private JournalSegment<E> currentSegment;
26   private Indexed<E> previousEntry;
27   private MappableJournalSegmentReader<E> currentReader;
28   private final Mode mode;
29
30   SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
31     this.journal = journal;
32     this.mode = mode;
33     currentSegment = journal.getSegment(index);
34     currentSegment.acquire();
35     currentReader = currentSegment.createReader();
36
37     long nextIndex = getNextIndex();
38     while (index > nextIndex && hasNext()) {
39       next();
40       nextIndex = getNextIndex();
41     }
42   }
43
44   @Override
45   public long getFirstIndex() {
46     return journal.getFirstSegment().index();
47   }
48
49   @Override
50   public long getCurrentIndex() {
51     long currentIndex = currentReader.getCurrentIndex();
52     if (currentIndex != 0) {
53       return currentIndex;
54     }
55     if (previousEntry != null) {
56       return previousEntry.index();
57     }
58     return 0;
59   }
60
61   @Override
62   public Indexed<E> getCurrentEntry() {
63     Indexed<E> currentEntry = currentReader.getCurrentEntry();
64     if (currentEntry != null) {
65       return currentEntry;
66     }
67     return previousEntry;
68   }
69
70   @Override
71   public long getNextIndex() {
72     return currentReader.getNextIndex();
73   }
74
75   @Override
76   public void reset() {
77     previousEntry = null;
78     currentReader.close();
79     currentSegment.release();
80
81     currentSegment = journal.getFirstSegment();
82     currentSegment.acquire();
83     currentReader = currentSegment.createReader();
84   }
85
86   @Override
87   public void reset(long index) {
88     // If the current segment is not open, it has been replaced. Reset the segments.
89     if (!currentSegment.isOpen()) {
90       reset();
91     }
92
93     if (index < currentReader.getNextIndex()) {
94       rewind(index);
95     } else if (index > currentReader.getNextIndex()) {
96       forward(index);
97     } else {
98       currentReader.reset(index);
99     }
100   }
101
102   /**
103    * Rewinds the journal to the given index.
104    */
105   private void rewind(long index) {
106     if (currentSegment.index() >= index) {
107       JournalSegment<E> segment = journal.getSegment(index - 1);
108       if (segment != null) {
109         currentReader.close();
110         currentSegment.release();
111
112         currentSegment = segment;
113         currentSegment.acquire();
114         currentReader = currentSegment.createReader();
115       }
116     }
117
118     currentReader.reset(index);
119     previousEntry = currentReader.getCurrentEntry();
120   }
121
122   /**
123    * Fast forwards the journal to the given index.
124    */
125   private void forward(long index) {
126     while (getNextIndex() < index && hasNext()) {
127       next();
128     }
129   }
130
131   @Override
132   public boolean hasNext() {
133     if (mode == Mode.ALL) {
134       return hasNextEntry();
135     }
136
137     long nextIndex = getNextIndex();
138     long commitIndex = journal.getCommitIndex();
139     return nextIndex <= commitIndex && hasNextEntry();
140   }
141
142   private boolean hasNextEntry() {
143     if (currentReader.hasNext()) {
144       return true;
145     }
146     return moveToNextSegment() ? currentReader.hasNext() : false;
147   }
148
149   @Override
150   public Indexed<E> next() {
151     if (currentReader.hasNext()) {
152       previousEntry = currentReader.getCurrentEntry();
153       return currentReader.next();
154     }
155     if (moveToNextSegment()) {
156       return currentReader.next();
157     }
158     throw new NoSuchElementException();
159   }
160
161   @Override
162   public void close() {
163     currentReader.close();
164     journal.closeReader(this);
165   }
166
167   private boolean moveToNextSegment() {
168     final var nextSegment = journal.getNextSegment(currentSegment.index());
169     if (nextSegment == null || nextSegment.index() != getNextIndex()) {
170       return false;
171     }
172
173     previousEntry = currentReader.getCurrentEntry();
174     currentReader.close();
175     currentSegment.release();
176
177     currentSegment = nextSegment;
178     currentSegment.acquire();
179     currentReader = currentSegment.createReader();
180     return true;
181   }
182 }