2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
22 * A {@link JournalReader} traversing all entries.
24 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
25 final SegmentedJournal<E> journal;
26 private JournalSegment<E> currentSegment;
27 private Indexed<E> previousEntry;
28 private JournalSegmentReader<E> currentReader;
30 SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
31 this.journal = requireNonNull(journal);
32 currentSegment = requireNonNull(segment);
33 currentReader = segment.createReader();
37 public final long getFirstIndex() {
38 return journal.getFirstSegment().index();
42 public final long getCurrentIndex() {
43 final var currentEntry = currentReader.getCurrentEntry();
44 if (currentEntry != null) {
45 final long currentIndex = currentEntry.index();
46 if (currentIndex != 0) {
50 return previousEntry != null ? previousEntry.index() : 0;
54 public final Indexed<E> getCurrentEntry() {
55 // If previousEntry was the last in the previous segment, we may have moved currentReader to the next segment.
56 // That segment may be empty, though, in which case we need to report the previousEntry.
57 final Indexed<E> currentEntry;
58 return (currentEntry = currentReader.getCurrentEntry()) != null ? currentEntry : previousEntry;
62 public final long getNextIndex() {
63 return currentReader.getNextIndex();
67 public final void reset() {
69 currentReader.close();
71 currentSegment = journal.getFirstSegment();
72 currentReader = currentSegment.createReader();
76 public final void reset(long index) {
77 // If the current segment is not open, it has been replaced. Reset the segments.
78 if (!currentSegment.isOpen()) {
82 final var nextIndex = currentReader.getNextIndex();
83 if (index < nextIndex) {
85 } else if (index > nextIndex) {
88 currentReader.reset(index);
93 * Rewinds the journal to the given index.
95 private void rewind(long index) {
96 if (currentSegment.index() >= index) {
97 JournalSegment<E> segment = journal.getSegment(index - 1);
98 if (segment != null) {
99 currentReader.close();
101 currentSegment = segment;
102 currentReader = currentSegment.createReader();
106 currentReader.reset(index);
107 previousEntry = currentReader.getCurrentEntry();
111 * Fast forwards the journal to the given index.
113 private void forward(long index) {
114 while (getNextIndex() < index && tryNext() != null) {
120 public Indexed<E> tryNext() {
121 if (currentReader.hasNext()) {
122 previousEntry = currentReader.getCurrentEntry();
123 return currentReader.next();
126 final var nextSegment = journal.getNextSegment(currentSegment.index());
127 if (nextSegment == null || nextSegment.index() != getNextIndex()) {
131 previousEntry = currentReader.getCurrentEntry();
132 currentReader.close();
134 currentSegment = nextSegment;
135 currentReader = currentSegment.createReader();
136 return currentReader.hasNext() ? currentReader.next() : null;
140 public final void close() {
141 currentReader.close();
142 journal.closeReader(this);