2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
22 * A {@link JournalReader} traversing all entries.
24 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
25 final SegmentedJournal<E> journal;
27 private JournalSegment<E> currentSegment;
28 private JournalSegmentReader<E> currentReader;
29 private Indexed<E> currentEntry;
30 private long nextIndex;
32 SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
33 this.journal = requireNonNull(journal);
34 currentSegment = requireNonNull(segment);
35 currentReader = segment.createReader();
36 nextIndex = currentSegment.index();
41 public final long getFirstIndex() {
42 return journal.getFirstSegment().index();
46 public final long getCurrentIndex() {
47 return currentEntry != null ? currentEntry.index() : 0;
51 public final Indexed<E> getCurrentEntry() {
56 public final long getNextIndex() {
61 public final void reset() {
62 currentReader.close();
64 currentSegment = journal.getFirstSegment();
65 currentReader = currentSegment.createReader();
66 nextIndex = currentSegment.index();
71 public final void reset(final long index) {
72 // If the current segment is not open, it has been replaced. Reset the segments.
73 if (!currentSegment.isOpen()) {
77 if (index < nextIndex) {
79 } else if (index > nextIndex) {
80 while (index > nextIndex && tryNext() != null) {
84 resetCurrentReader(index);
88 private void resetCurrentReader(final long index) {
89 final var position = currentSegment.lookup(index - 1);
90 if (position != null) {
91 nextIndex = position.index();
92 currentReader.setPosition(position.position());
94 nextIndex = currentSegment.index();
95 currentReader.setPosition(JournalSegmentDescriptor.BYTES);
97 while (nextIndex < index && tryNext() != null) {
103 * Rewinds the journal to the given index.
105 private void rewind(final long index) {
106 if (currentSegment.index() >= index) {
107 JournalSegment<E> segment = journal.getSegment(index - 1);
108 if (segment != null) {
109 currentReader.close();
111 currentSegment = segment;
112 currentReader = currentSegment.createReader();
116 resetCurrentReader(index);
120 public Indexed<E> tryNext() {
121 var next = currentReader.readEntry(nextIndex);
123 final var nextSegment = journal.getNextSegment(currentSegment.index());
124 if (nextSegment == null || nextSegment.index() != nextIndex) {
128 currentReader.close();
130 currentSegment = nextSegment;
131 currentReader = currentSegment.createReader();
132 next = currentReader.readEntry(nextIndex);
138 nextIndex = nextIndex + 1;
144 public final void close() {
145 currentReader.close();
146 journal.closeReader(this);