2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import java.util.NoSuchElementException;
23 public final class SegmentedJournalReader<E> implements JournalReader<E> {
24 private final SegmentedJournal<E> journal;
25 private JournalSegment<E> currentSegment;
26 private Indexed<E> previousEntry;
27 private MappableJournalSegmentReader<E> currentReader;
28 private final Mode mode;
30 SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
31 this.journal = journal;
33 currentSegment = journal.getSegment(index);
34 currentSegment.acquire();
35 currentReader = currentSegment.createReader();
37 long nextIndex = getNextIndex();
38 while (index > nextIndex && hasNext()) {
40 nextIndex = getNextIndex();
45 public long getFirstIndex() {
46 return journal.getFirstSegment().index();
50 public long getCurrentIndex() {
51 long currentIndex = currentReader.getCurrentIndex();
52 if (currentIndex != 0) {
55 if (previousEntry != null) {
56 return previousEntry.index();
62 public Indexed<E> getCurrentEntry() {
63 Indexed<E> currentEntry = currentReader.getCurrentEntry();
64 if (currentEntry != null) {
71 public long getNextIndex() {
72 return currentReader.getNextIndex();
78 currentReader.close();
79 currentSegment.release();
81 currentSegment = journal.getFirstSegment();
82 currentSegment.acquire();
83 currentReader = currentSegment.createReader();
87 public void reset(long index) {
88 // If the current segment is not open, it has been replaced. Reset the segments.
89 if (!currentSegment.isOpen()) {
93 if (index < currentReader.getNextIndex()) {
95 } else if (index > currentReader.getNextIndex()) {
98 currentReader.reset(index);
103 * Rewinds the journal to the given index.
105 private void rewind(long index) {
106 if (currentSegment.index() >= index) {
107 JournalSegment<E> segment = journal.getSegment(index - 1);
108 if (segment != null) {
109 currentReader.close();
110 currentSegment.release();
112 currentSegment = segment;
113 currentSegment.acquire();
114 currentReader = currentSegment.createReader();
118 currentReader.reset(index);
119 previousEntry = currentReader.getCurrentEntry();
123 * Fast forwards the journal to the given index.
125 private void forward(long index) {
126 while (getNextIndex() < index && hasNext()) {
132 public boolean hasNext() {
133 if (mode == Mode.ALL) {
134 return hasNextEntry();
137 long nextIndex = getNextIndex();
138 long commitIndex = journal.getCommitIndex();
139 return nextIndex <= commitIndex && hasNextEntry();
142 private boolean hasNextEntry() {
143 if (currentReader.hasNext()) {
146 return moveToNextSegment() ? currentReader.hasNext() : false;
150 public Indexed<E> next() {
151 if (currentReader.hasNext()) {
152 previousEntry = currentReader.getCurrentEntry();
153 return currentReader.next();
155 if (moveToNextSegment()) {
156 return currentReader.next();
158 throw new NoSuchElementException();
162 public void close() {
163 currentReader.close();
164 journal.closeReader(this);
167 private boolean moveToNextSegment() {
168 final var nextSegment = journal.getNextSegment(currentSegment.index());
169 if (nextSegment == null || nextSegment.index() != getNextIndex()) {
173 previousEntry = currentReader.getCurrentEntry();
174 currentReader.close();
175 currentSegment.release();
177 currentSegment = nextSegment;
178 currentSegment.acquire();
179 currentReader = currentSegment.createReader();