2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
22 * A {@link JournalReader} traversing all entries.
24 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
25 final SegmentedJournal<E> journal;
27 private JournalSegment currentSegment;
28 private JournalSegmentReader currentReader;
29 private long nextIndex;
31 SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
32 this.journal = requireNonNull(journal);
33 currentSegment = requireNonNull(segment);
34 currentReader = segment.createReader();
35 nextIndex = currentSegment.firstIndex();
39 public final long getFirstIndex() {
40 return journal.getFirstSegment().firstIndex();
44 public final long getNextIndex() {
49 public final void reset() {
50 currentReader.close();
52 currentSegment = journal.getFirstSegment();
53 currentReader = currentSegment.createReader();
54 nextIndex = currentSegment.firstIndex();
58 public final void reset(final long index) {
59 // If the current segment is not open, it has been replaced. Reset the segments.
60 if (!currentSegment.isOpen()) {
64 if (index < nextIndex) {
66 } else if (index > nextIndex) {
67 while (index > nextIndex && tryNext() != null) {
71 resetCurrentReader(index);
75 private void resetCurrentReader(final long index) {
76 final var position = currentSegment.lookup(index - 1);
77 if (position != null) {
78 nextIndex = position.index();
79 currentReader.setPosition(position.position());
81 nextIndex = currentSegment.firstIndex();
82 currentReader.setPosition(JournalSegmentDescriptor.BYTES);
84 while (nextIndex < index && tryNext() != null) {
90 * Rewinds the journal to the given index.
92 private void rewind(final long index) {
93 if (currentSegment.firstIndex() >= index) {
94 JournalSegment segment = journal.getSegment(index - 1);
95 if (segment != null) {
96 currentReader.close();
98 currentSegment = segment;
99 currentReader = currentSegment.createReader();
103 resetCurrentReader(index);
107 public Indexed<E> tryNext() {
108 final var index = nextIndex;
109 var buf = currentReader.readBytes(index);
111 final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
112 if (nextSegment == null || nextSegment.firstIndex() != index) {
116 currentReader.close();
118 currentSegment = nextSegment;
119 currentReader = currentSegment.createReader();
120 buf = currentReader.readBytes(index);
126 final var entry = journal.serializer().deserialize(buf);
127 nextIndex = index + 1;
128 return new Indexed<>(index, entry, buf.readableBytes());
132 public final void close() {
133 currentReader.close();
134 journal.closeReader(this);