2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import java.util.NoSuchElementException;
23 public class SegmentedJournalReader<E> implements JournalReader<E> {
24 private final SegmentedJournal<E> journal;
25 private JournalSegment<E> currentSegment;
26 private Indexed<E> previousEntry;
27 private MappableJournalSegmentReader<E> currentReader;
28 private final Mode mode;
30 public SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
31 this.journal = journal;
33 currentSegment = journal.getSegment(index);
34 currentSegment.acquire();
35 currentReader = currentSegment.createReader();
36 long nextIndex = getNextIndex();
37 while (index > nextIndex && hasNext()) {
39 nextIndex = getNextIndex();
44 public long getFirstIndex() {
45 return journal.getFirstSegment().index();
49 public long getCurrentIndex() {
50 long currentIndex = currentReader.getCurrentIndex();
51 if (currentIndex != 0) {
54 if (previousEntry != null) {
55 return previousEntry.index();
61 public Indexed<E> getCurrentEntry() {
62 Indexed<E> currentEntry = currentReader.getCurrentEntry();
63 if (currentEntry != null) {
70 public long getNextIndex() {
71 return currentReader.getNextIndex();
76 currentReader.close();
77 currentSegment.release();
78 currentSegment = journal.getFirstSegment();
79 currentSegment.acquire();
80 currentReader = currentSegment.createReader();
85 public void reset(long index) {
86 // If the current segment is not open, it has been replaced. Reset the segments.
87 if (!currentSegment.isOpen()) {
91 if (index < currentReader.getNextIndex()) {
93 } else if (index > currentReader.getNextIndex()) {
96 currentReader.reset(index);
101 * Rewinds the journal to the given index.
103 private void rewind(long index) {
104 if (currentSegment.index() >= index) {
105 JournalSegment<E> segment = journal.getSegment(index - 1);
106 if (segment != null) {
107 currentReader.close();
108 currentSegment.release();
109 currentSegment = segment;
110 currentSegment.acquire();
111 currentReader = currentSegment.createReader();
115 currentReader.reset(index);
116 previousEntry = currentReader.getCurrentEntry();
120 * Fast forwards the journal to the given index.
122 private void forward(long index) {
123 while (getNextIndex() < index && hasNext()) {
129 public boolean hasNext() {
130 if (mode == Mode.ALL) {
131 return hasNextEntry();
134 long nextIndex = getNextIndex();
135 long commitIndex = journal.getCommitIndex();
136 return nextIndex <= commitIndex && hasNextEntry();
139 private boolean hasNextEntry() {
140 if (!currentReader.hasNext()) {
141 JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
142 if (nextSegment != null && nextSegment.index() == getNextIndex()) {
143 previousEntry = currentReader.getCurrentEntry();
144 currentSegment.release();
145 currentSegment = nextSegment;
146 currentSegment.acquire();
147 currentReader = currentSegment.createReader();
148 return currentReader.hasNext();
156 public Indexed<E> next() {
157 if (!currentReader.hasNext()) {
158 JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
159 if (nextSegment != null && nextSegment.index() == getNextIndex()) {
160 previousEntry = currentReader.getCurrentEntry();
161 currentSegment.release();
162 currentSegment = nextSegment;
163 currentSegment.acquire();
164 currentReader = currentSegment.createReader();
165 return currentReader.next();
167 throw new NoSuchElementException();
170 previousEntry = currentReader.getCurrentEntry();
171 return currentReader.next();
176 public void close() {
177 currentReader.close();
178 journal.closeReader(this);