2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import java.util.NoSuchElementException;
23 public class SegmentedJournalReader<E> implements JournalReader<E> {
25 private final SegmentedJournal<E> journal;
26 private JournalSegment<E> currentSegment;
27 private Indexed<E> previousEntry;
28 private MappableJournalSegmentReader<E> currentReader;
29 private final Mode mode;
31 public SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
32 this.journal = journal;
38 * Initializes the reader to the given index.
40 private void initialize(long index) {
41 currentSegment = journal.getSegment(index);
42 currentSegment.acquire();
43 currentReader = currentSegment.createReader();
44 long nextIndex = getNextIndex();
45 while (index > nextIndex && hasNext()) {
47 nextIndex = getNextIndex();
52 public long getFirstIndex() {
53 return journal.getFirstSegment().index();
57 public long getCurrentIndex() {
58 long currentIndex = currentReader.getCurrentIndex();
59 if (currentIndex != 0) {
62 if (previousEntry != null) {
63 return previousEntry.index();
69 public Indexed<E> getCurrentEntry() {
70 Indexed<E> currentEntry = currentReader.getCurrentEntry();
71 if (currentEntry != null) {
78 public long getNextIndex() {
79 return currentReader.getNextIndex();
84 currentReader.close();
85 currentSegment.release();
86 currentSegment = journal.getFirstSegment();
87 currentSegment.acquire();
88 currentReader = currentSegment.createReader();
93 public void reset(long index) {
94 // If the current segment is not open, it has been replaced. Reset the segments.
95 if (!currentSegment.isOpen()) {
99 if (index < currentReader.getNextIndex()) {
101 } else if (index > currentReader.getNextIndex()) {
104 currentReader.reset(index);
109 * Rewinds the journal to the given index.
111 private void rewind(long index) {
112 if (currentSegment.index() >= index) {
113 JournalSegment<E> segment = journal.getSegment(index - 1);
114 if (segment != null) {
115 currentReader.close();
116 currentSegment.release();
117 currentSegment = segment;
118 currentSegment.acquire();
119 currentReader = currentSegment.createReader();
123 currentReader.reset(index);
124 previousEntry = currentReader.getCurrentEntry();
128 * Fast forwards the journal to the given index.
130 private void forward(long index) {
131 while (getNextIndex() < index && hasNext()) {
137 public boolean hasNext() {
138 if (mode == Mode.ALL) {
139 return hasNextEntry();
142 long nextIndex = getNextIndex();
143 long commitIndex = journal.getCommitIndex();
144 return nextIndex <= commitIndex && hasNextEntry();
147 private boolean hasNextEntry() {
148 if (!currentReader.hasNext()) {
149 JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
150 if (nextSegment != null && nextSegment.index() == getNextIndex()) {
151 previousEntry = currentReader.getCurrentEntry();
152 currentSegment.release();
153 currentSegment = nextSegment;
154 currentSegment.acquire();
155 currentReader = currentSegment.createReader();
156 return currentReader.hasNext();
164 public Indexed<E> next() {
165 if (!currentReader.hasNext()) {
166 JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
167 if (nextSegment != null && nextSegment.index() == getNextIndex()) {
168 previousEntry = currentReader.getCurrentEntry();
169 currentSegment.release();
170 currentSegment = nextSegment;
171 currentSegment.acquire();
172 currentReader = currentSegment.createReader();
173 return currentReader.next();
175 throw new NoSuchElementException();
178 previousEntry = currentReader.getCurrentEntry();
179 return currentReader.next();
184 public void close() {
185 currentReader.close();
186 journal.closeReader(this);