2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
21 import org.eclipse.jdt.annotation.NonNull;
24 * A {@link JournalReader} traversing all entries.
26 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
27 // Marker non-null object for tryAdvance()
28 private static final @NonNull Object ADVANCED = new Object();
30 final SegmentedJournal<E> journal;
32 private JournalSegment currentSegment;
33 private JournalSegmentReader currentReader;
34 private long nextIndex;
36 SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
37 this.journal = requireNonNull(journal);
38 currentSegment = requireNonNull(segment);
39 currentReader = segment.createReader();
40 nextIndex = currentSegment.firstIndex();
44 public final long getFirstIndex() {
45 return journal.getFirstSegment().firstIndex();
49 public final long getNextIndex() {
54 public final void reset() {
55 currentReader.close();
57 currentSegment = journal.getFirstSegment();
58 currentReader = currentSegment.createReader();
59 nextIndex = currentSegment.firstIndex();
63 public final void reset(final long index) {
64 // If the current segment is not open, it has been replaced. Reset the segments.
65 if (!currentSegment.isOpen()) {
69 if (index < nextIndex) {
71 } else if (index > nextIndex) {
72 while (index > nextIndex && tryAdvance()) {
76 resetCurrentReader(index);
80 private void resetCurrentReader(final long index) {
81 final var position = currentSegment.lookup(index - 1);
82 if (position != null) {
83 nextIndex = position.index();
84 currentReader.setPosition(position.position());
86 nextIndex = currentSegment.firstIndex();
87 currentReader.setPosition(JournalSegmentDescriptor.BYTES);
89 while (nextIndex < index && tryAdvance()) {
95 * Rewinds the journal to the given index.
97 private void rewind(final long index) {
98 if (currentSegment.firstIndex() >= index) {
99 JournalSegment segment = journal.getSegment(index - 1);
100 if (segment != null) {
101 currentReader.close();
103 currentSegment = segment;
104 currentReader = currentSegment.createReader();
108 resetCurrentReader(index);
112 public <T> T tryNext(final EntryMapper<E, T> mapper) {
113 final var index = nextIndex;
114 var buf = currentReader.readBytes(index);
116 final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
117 if (nextSegment == null || nextSegment.firstIndex() != index) {
121 currentReader.close();
123 currentSegment = nextSegment;
124 currentReader = currentSegment.createReader();
125 buf = currentReader.readBytes(index);
131 final var entry = journal.serializer().deserialize(buf);
132 final var ret = requireNonNull(mapper.mapEntry(index, entry, buf.readableBytes()));
133 nextIndex = index + 1;
138 * Try to move to the next entry.
140 * @return {@code true} if there was a next entry and this reader has moved to it
142 final boolean tryAdvance() {
143 return tryNext((index, entry, size) -> ADVANCED) != null;
147 public final void close() {
148 currentReader.close();
149 journal.closeReader(this);