2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static java.util.Objects.requireNonNull;
21 * A {@link JournalReader} traversing all entries.
23 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
24 final SegmentedJournal<E> journal;
25 private JournalSegment<E> currentSegment;
26 private Indexed<E> previousEntry;
27 private JournalSegmentReader<E> currentReader;
29 SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
30 this.journal = requireNonNull(journal);
31 currentSegment = requireNonNull(segment);
32 currentReader = segment.createReader();
36 public final long getFirstIndex() {
37 return journal.getFirstSegment().index();
41 public final long getCurrentIndex() {
42 final var currentEntry = currentReader.getCurrentEntry();
43 if (currentEntry != null) {
44 final long currentIndex = currentEntry.index();
45 if (currentIndex != 0) {
49 return previousEntry != null ? previousEntry.index() : 0;
53 public final Indexed<E> getCurrentEntry() {
54 // If previousEntry was the last in the previous segment, we may have moved currentReader to the next segment.
55 // That segment may be empty, though, in which case we need to report the previousEntry.
56 final Indexed<E> currentEntry;
57 return (currentEntry = currentReader.getCurrentEntry()) != null ? currentEntry : previousEntry;
61 public final long getNextIndex() {
62 return currentReader.getNextIndex();
66 public final void reset() {
68 currentReader.close();
70 currentSegment = journal.getFirstSegment();
71 currentReader = currentSegment.createReader();
75 public final void reset(long index) {
76 // If the current segment is not open, it has been replaced. Reset the segments.
77 if (!currentSegment.isOpen()) {
81 final var nextIndex = currentReader.getNextIndex();
82 if (index < nextIndex) {
84 } else if (index > nextIndex) {
87 currentReader.reset(index);
92 * Rewinds the journal to the given index.
94 private void rewind(long index) {
95 if (currentSegment.index() >= index) {
96 JournalSegment<E> segment = journal.getSegment(index - 1);
97 if (segment != null) {
98 currentReader.close();
100 currentSegment = segment;
101 currentReader = currentSegment.createReader();
105 currentReader.reset(index);
106 previousEntry = currentReader.getCurrentEntry();
110 * Fast forwards the journal to the given index.
112 private void forward(long index) {
113 while (getNextIndex() < index && tryNext() != null) {
119 public Indexed<E> tryNext() {
120 if (currentReader.hasNext()) {
121 previousEntry = currentReader.getCurrentEntry();
122 return currentReader.next();
125 final var nextSegment = journal.getNextSegment(currentSegment.index());
126 if (nextSegment == null || nextSegment.index() != getNextIndex()) {
130 previousEntry = currentReader.getCurrentEntry();
131 currentReader.close();
133 currentSegment = nextSegment;
134 currentReader = currentSegment.createReader();
135 return currentReader.hasNext() ? currentReader.next() : null;
139 public final void close() {
140 currentReader.close();
141 journal.closeReader(this);