2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
21 import io.netty.buffer.ByteBuf;
22 import org.eclipse.jdt.annotation.NonNull;
25 * A {@link ByteBufReader} implementation.
27 sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCommitsByteBufReader {
28 final @NonNull SegmentedByteBufJournal journal;
30 private JournalSegment currentSegment;
31 private JournalSegmentReader currentReader;
32 private long nextIndex;
34 SegmentedByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
35 this.journal = requireNonNull(journal);
36 currentSegment = requireNonNull(segment);
37 currentReader = segment.createReader();
38 nextIndex = currentSegment.firstIndex();
42 public final long firstIndex() {
43 return journal.firstSegment().firstIndex();
47 public final long nextIndex() {
52 public final void reset() {
53 currentReader.close();
54 currentSegment = journal.firstSegment();
55 currentReader = currentSegment.createReader();
56 nextIndex = currentSegment.firstIndex();
60 public final void reset(final long index) {
61 // If the current segment is not open, it has been replaced. Reset the segments.
62 if (!currentSegment.isOpen()) {
65 if (index < nextIndex) {
67 } else if (index > nextIndex) {
70 resetCurrentReader(index);
74 private void resetCurrentReader(final long index) {
75 final var position = currentSegment.lookup(index - 1);
76 if (position != null) {
77 nextIndex = position.index();
78 currentReader.setPosition(position.position());
80 nextIndex = currentSegment.firstIndex();
81 currentReader.setPosition(JournalSegmentDescriptor.BYTES);
87 * Rewinds the journal to the given index.
89 private void rewind(final long index) {
90 if (currentSegment.firstIndex() >= index) {
91 final var segment = journal.segment(index - 1);
92 if (segment != null) {
93 currentReader.close();
94 currentSegment = segment;
95 currentReader = currentSegment.createReader();
98 resetCurrentReader(index);
101 private void forwardTo(final long index) {
102 while (nextIndex < index && tryAdvance(nextIndex) != null) {
103 // No-op -- nextIndex value is updated in tryAdvance()
108 public final <T> T tryNext(final EntryMapper<T> entryMapper) {
109 final var index = nextIndex;
110 final var bytes = tryAdvance(index);
111 return bytes == null ? null : entryMapper.mapEntry(index, bytes);
115 * Attempt to read the next entry. {@code index} here is really {@code nextIndex} passed by callers, which already
116 * check it for invariants. If non-null is returned, {@code nextIndex} has already been set to {@code index + 1}.
119 * This method is shared between 'all entries' and 'committed entries only' variants. The distinction is made by
120 * an additional check in {@link SegmentedCommitsByteBufReader#tryAdvance(long)}.
122 * @param index next index
123 * @return Entry bytes, or {@code null}
125 ByteBuf tryAdvance(final long index) {
126 var buf = currentReader.readBytes();
128 final var nextSegment = journal.nextSegment(currentSegment.firstIndex());
129 if (nextSegment == null || nextSegment.firstIndex() != index) {
132 currentReader.close();
133 currentSegment = nextSegment;
134 currentReader = currentSegment.createReader();
135 buf = currentReader.readBytes();
140 nextIndex = index + 1;
145 public final void close() {
146 currentReader.close();
147 journal.closeReader(this);