Use ConcurrentNavigableMap
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.util.NoSuchElementException;
19
20 /**
21  * Raft log reader.
22  */
23 public class SegmentedJournalReader<E> implements JournalReader<E> {
24
25   private final SegmentedJournal<E> journal;
26   private JournalSegment<E> currentSegment;
27   private Indexed<E> previousEntry;
28   private MappableJournalSegmentReader<E> currentReader;
29   private final Mode mode;
30
31   public SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
32     this.journal = journal;
33     this.mode = mode;
34     initialize(index);
35   }
36
37   /**
38    * Initializes the reader to the given index.
39    */
40   private void initialize(long index) {
41     currentSegment = journal.getSegment(index);
42     currentSegment.acquire();
43     currentReader = currentSegment.createReader();
44     long nextIndex = getNextIndex();
45     while (index > nextIndex && hasNext()) {
46       next();
47       nextIndex = getNextIndex();
48     }
49   }
50
51   @Override
52   public long getFirstIndex() {
53     return journal.getFirstSegment().index();
54   }
55
56   @Override
57   public long getCurrentIndex() {
58     long currentIndex = currentReader.getCurrentIndex();
59     if (currentIndex != 0) {
60       return currentIndex;
61     }
62     if (previousEntry != null) {
63       return previousEntry.index();
64     }
65     return 0;
66   }
67
68   @Override
69   public Indexed<E> getCurrentEntry() {
70     Indexed<E> currentEntry = currentReader.getCurrentEntry();
71     if (currentEntry != null) {
72       return currentEntry;
73     }
74     return previousEntry;
75   }
76
77   @Override
78   public long getNextIndex() {
79     return currentReader.getNextIndex();
80   }
81
82   @Override
83   public void reset() {
84     currentReader.close();
85     currentSegment.release();
86     currentSegment = journal.getFirstSegment();
87     currentSegment.acquire();
88     currentReader = currentSegment.createReader();
89     previousEntry = null;
90   }
91
92   @Override
93   public void reset(long index) {
94     // If the current segment is not open, it has been replaced. Reset the segments.
95     if (!currentSegment.isOpen()) {
96       reset();
97     }
98
99     if (index < currentReader.getNextIndex()) {
100       rewind(index);
101     } else if (index > currentReader.getNextIndex()) {
102       forward(index);
103     } else {
104       currentReader.reset(index);
105     }
106   }
107
108   /**
109    * Rewinds the journal to the given index.
110    */
111   private void rewind(long index) {
112     if (currentSegment.index() >= index) {
113       JournalSegment<E> segment = journal.getSegment(index - 1);
114       if (segment != null) {
115         currentReader.close();
116         currentSegment.release();
117         currentSegment = segment;
118         currentSegment.acquire();
119         currentReader = currentSegment.createReader();
120       }
121     }
122
123     currentReader.reset(index);
124     previousEntry = currentReader.getCurrentEntry();
125   }
126
127   /**
128    * Fast forwards the journal to the given index.
129    */
130   private void forward(long index) {
131     while (getNextIndex() < index && hasNext()) {
132       next();
133     }
134   }
135
136   @Override
137   public boolean hasNext() {
138     if (mode == Mode.ALL) {
139       return hasNextEntry();
140     }
141
142     long nextIndex = getNextIndex();
143     long commitIndex = journal.getCommitIndex();
144     return nextIndex <= commitIndex && hasNextEntry();
145   }
146
147   private boolean hasNextEntry() {
148     if (!currentReader.hasNext()) {
149       JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
150       if (nextSegment != null && nextSegment.index() == getNextIndex()) {
151         previousEntry = currentReader.getCurrentEntry();
152         currentSegment.release();
153         currentSegment = nextSegment;
154         currentSegment.acquire();
155         currentReader = currentSegment.createReader();
156         return currentReader.hasNext();
157       }
158       return false;
159     }
160     return true;
161   }
162
163   @Override
164   public Indexed<E> next() {
165     if (!currentReader.hasNext()) {
166       JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
167       if (nextSegment != null && nextSegment.index() == getNextIndex()) {
168         previousEntry = currentReader.getCurrentEntry();
169         currentSegment.release();
170         currentSegment = nextSegment;
171         currentSegment.acquire();
172         currentReader = currentSegment.createReader();
173         return currentReader.next();
174       } else {
175         throw new NoSuchElementException();
176       }
177     } else {
178       previousEntry = currentReader.getCurrentEntry();
179       return currentReader.next();
180     }
181   }
182
183   @Override
184   public void close() {
185     currentReader.close();
186     journal.closeReader(this);
187   }
188 }