2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static java.util.Objects.requireNonNull;
20 import java.util.NoSuchElementException;
23 * A {@link JournalReader} traversing all entries.
25 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
26 final SegmentedJournal<E> journal;
27 private JournalSegment<E> currentSegment;
28 private Indexed<E> previousEntry;
29 private JournalSegmentReader<E> currentReader;
31 SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
32 this.journal = requireNonNull(journal);
33 currentSegment = requireNonNull(segment);
34 currentReader = segment.createReader();
38 public final long getFirstIndex() {
39 return journal.getFirstSegment().index();
43 public final long getCurrentIndex() {
44 long currentIndex = currentReader.getCurrentIndex();
45 if (currentIndex != 0) {
48 if (previousEntry != null) {
49 return previousEntry.index();
55 public final Indexed<E> getCurrentEntry() {
56 // If previousEntry was the last in the previous segment, we may have moved currentReader to the next segment.
57 // That segment may be empty, though, in which case we need to report the previousEntry.
58 final Indexed<E> currentEntry;
59 return (currentEntry = currentReader.getCurrentEntry()) != null ? currentEntry : previousEntry;
63 public final long getNextIndex() {
64 return currentReader.getNextIndex();
68 public final void reset() {
70 currentReader.close();
72 currentSegment = journal.getFirstSegment();
73 currentReader = currentSegment.createReader();
77 public final void reset(long index) {
78 // If the current segment is not open, it has been replaced. Reset the segments.
79 if (!currentSegment.isOpen()) {
83 if (index < currentReader.getNextIndex()) {
85 } else if (index > currentReader.getNextIndex()) {
88 currentReader.reset(index);
93 * Rewinds the journal to the given index.
95 private void rewind(long index) {
96 if (currentSegment.index() >= index) {
97 JournalSegment<E> segment = journal.getSegment(index - 1);
98 if (segment != null) {
99 currentReader.close();
101 currentSegment = segment;
102 currentReader = currentSegment.createReader();
106 currentReader.reset(index);
107 previousEntry = currentReader.getCurrentEntry();
111 * Fast forwards the journal to the given index.
113 private void forward(long index) {
114 while (getNextIndex() < index && hasNext()) {
120 public boolean hasNext() {
121 return currentReader.hasNext() || moveToNextSegment() && currentReader.hasNext();
125 public final Indexed<E> next() {
126 if (currentReader.hasNext()) {
127 previousEntry = currentReader.getCurrentEntry();
128 return currentReader.next();
130 if (moveToNextSegment()) {
131 return currentReader.next();
133 throw new NoSuchElementException();
137 public final void close() {
138 currentReader.close();
139 journal.closeReader(this);
142 private boolean moveToNextSegment() {
143 final var nextSegment = journal.getNextSegment(currentSegment.index());
144 if (nextSegment == null || nextSegment.index() != getNextIndex()) {
148 previousEntry = currentReader.getCurrentEntry();
149 currentReader.close();
151 currentSegment = nextSegment;
152 currentReader = currentSegment.createReader();