Inline SegmentedJournalReader.initialize()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.util.NoSuchElementException;
19
20 /**
21  * Raft log reader.
22  */
23 public class SegmentedJournalReader<E> implements JournalReader<E> {
24   private final SegmentedJournal<E> journal;
25   private JournalSegment<E> currentSegment;
26   private Indexed<E> previousEntry;
27   private MappableJournalSegmentReader<E> currentReader;
28   private final Mode mode;
29
30   public SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
31     this.journal = journal;
32     this.mode = mode;
33     currentSegment = journal.getSegment(index);
34     currentSegment.acquire();
35     currentReader = currentSegment.createReader();
36     long nextIndex = getNextIndex();
37     while (index > nextIndex && hasNext()) {
38       next();
39       nextIndex = getNextIndex();
40     }
41   }
42
43   @Override
44   public long getFirstIndex() {
45     return journal.getFirstSegment().index();
46   }
47
48   @Override
49   public long getCurrentIndex() {
50     long currentIndex = currentReader.getCurrentIndex();
51     if (currentIndex != 0) {
52       return currentIndex;
53     }
54     if (previousEntry != null) {
55       return previousEntry.index();
56     }
57     return 0;
58   }
59
60   @Override
61   public Indexed<E> getCurrentEntry() {
62     Indexed<E> currentEntry = currentReader.getCurrentEntry();
63     if (currentEntry != null) {
64       return currentEntry;
65     }
66     return previousEntry;
67   }
68
69   @Override
70   public long getNextIndex() {
71     return currentReader.getNextIndex();
72   }
73
74   @Override
75   public void reset() {
76     currentReader.close();
77     currentSegment.release();
78     currentSegment = journal.getFirstSegment();
79     currentSegment.acquire();
80     currentReader = currentSegment.createReader();
81     previousEntry = null;
82   }
83
84   @Override
85   public void reset(long index) {
86     // If the current segment is not open, it has been replaced. Reset the segments.
87     if (!currentSegment.isOpen()) {
88       reset();
89     }
90
91     if (index < currentReader.getNextIndex()) {
92       rewind(index);
93     } else if (index > currentReader.getNextIndex()) {
94       forward(index);
95     } else {
96       currentReader.reset(index);
97     }
98   }
99
100   /**
101    * Rewinds the journal to the given index.
102    */
103   private void rewind(long index) {
104     if (currentSegment.index() >= index) {
105       JournalSegment<E> segment = journal.getSegment(index - 1);
106       if (segment != null) {
107         currentReader.close();
108         currentSegment.release();
109         currentSegment = segment;
110         currentSegment.acquire();
111         currentReader = currentSegment.createReader();
112       }
113     }
114
115     currentReader.reset(index);
116     previousEntry = currentReader.getCurrentEntry();
117   }
118
119   /**
120    * Fast forwards the journal to the given index.
121    */
122   private void forward(long index) {
123     while (getNextIndex() < index && hasNext()) {
124       next();
125     }
126   }
127
128   @Override
129   public boolean hasNext() {
130     if (mode == Mode.ALL) {
131       return hasNextEntry();
132     }
133
134     long nextIndex = getNextIndex();
135     long commitIndex = journal.getCommitIndex();
136     return nextIndex <= commitIndex && hasNextEntry();
137   }
138
139   private boolean hasNextEntry() {
140     if (!currentReader.hasNext()) {
141       JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
142       if (nextSegment != null && nextSegment.index() == getNextIndex()) {
143         previousEntry = currentReader.getCurrentEntry();
144         currentSegment.release();
145         currentSegment = nextSegment;
146         currentSegment.acquire();
147         currentReader = currentSegment.createReader();
148         return currentReader.hasNext();
149       }
150       return false;
151     }
152     return true;
153   }
154
155   @Override
156   public Indexed<E> next() {
157     if (!currentReader.hasNext()) {
158       JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
159       if (nextSegment != null && nextSegment.index() == getNextIndex()) {
160         previousEntry = currentReader.getCurrentEntry();
161         currentSegment.release();
162         currentSegment = nextSegment;
163         currentSegment.acquire();
164         currentReader = currentSegment.createReader();
165         return currentReader.next();
166       } else {
167         throw new NoSuchElementException();
168       }
169     } else {
170       previousEntry = currentReader.getCurrentEntry();
171       return currentReader.next();
172     }
173   }
174
175   @Override
176   public void close() {
177     currentReader.close();
178     journal.closeReader(this);
179   }
180 }