2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
22 * A {@link JournalReader} traversing all entries.
24 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
25 final SegmentedJournal<E> journal;
27 private JournalSegment<E> currentSegment;
28 private JournalSegmentReader<E> currentReader;
29 private Indexed<E> currentEntry;
30 private long nextIndex;
32 SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
33 this.journal = requireNonNull(journal);
34 currentSegment = requireNonNull(segment);
35 currentReader = segment.createReader();
36 nextIndex = currentSegment.firstIndex();
41 public final long getFirstIndex() {
42 return journal.getFirstSegment().firstIndex();
46 public final Indexed<E> getCurrentEntry() {
51 public final long getNextIndex() {
56 public final void reset() {
57 currentReader.close();
59 currentSegment = journal.getFirstSegment();
60 currentReader = currentSegment.createReader();
61 nextIndex = currentSegment.firstIndex();
66 public final void reset(final long index) {
67 // If the current segment is not open, it has been replaced. Reset the segments.
68 if (!currentSegment.isOpen()) {
72 if (index < nextIndex) {
74 } else if (index > nextIndex) {
75 while (index > nextIndex && tryNext() != null) {
79 resetCurrentReader(index);
83 private void resetCurrentReader(final long index) {
84 final var position = currentSegment.lookup(index - 1);
85 if (position != null) {
86 nextIndex = position.index();
87 currentReader.setPosition(position.position());
89 nextIndex = currentSegment.firstIndex();
90 currentReader.setPosition(JournalSegmentDescriptor.BYTES);
92 while (nextIndex < index && tryNext() != null) {
98 * Rewinds the journal to the given index.
100 private void rewind(final long index) {
101 if (currentSegment.firstIndex() >= index) {
102 JournalSegment<E> segment = journal.getSegment(index - 1);
103 if (segment != null) {
104 currentReader.close();
106 currentSegment = segment;
107 currentReader = currentSegment.createReader();
111 resetCurrentReader(index);
115 public Indexed<E> tryNext() {
116 var next = currentReader.readEntry(nextIndex);
118 final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
119 if (nextSegment == null || nextSegment.firstIndex() != nextIndex) {
123 currentReader.close();
125 currentSegment = nextSegment;
126 currentReader = currentSegment.createReader();
127 next = currentReader.readEntry(nextIndex);
133 nextIndex = nextIndex + 1;
139 public final void close() {
140 currentReader.close();
141 journal.closeReader(this);