2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
21 import io.netty.buffer.ByteBuf;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.controller.raft.journal.EntryReader;
24 import org.opendaylight.controller.raft.journal.FromByteBufMapper;
27 * A {@link EntryReader} implementation.
29 sealed class SegmentedByteBufReader implements EntryReader permits SegmentedCommitsByteBufReader {
30 final @NonNull SegmentedByteBufJournal journal;
32 private JournalSegment currentSegment;
33 private JournalSegmentReader currentReader;
34 private long nextIndex;
36 SegmentedByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
37 this.journal = requireNonNull(journal);
38 currentSegment = requireNonNull(segment);
39 currentReader = segment.createReader();
40 nextIndex = currentSegment.firstIndex();
44 public final long nextIndex() {
49 public final void reset() {
50 currentReader.close();
51 currentSegment = journal.firstSegment();
52 currentReader = currentSegment.createReader();
53 nextIndex = currentSegment.firstIndex();
57 public final void reset(final long index) {
58 // If the current segment is not open, it has been replaced. Reset the segments.
59 if (!currentSegment.isOpen()) {
62 if (index < nextIndex) {
64 } else if (index > nextIndex) {
67 resetCurrentReader(index);
71 private void resetCurrentReader(final long index) {
72 final var position = currentSegment.lookup(index - 1);
73 if (position != null) {
74 nextIndex = position.index();
75 currentReader.setPosition(position.position());
77 nextIndex = currentSegment.firstIndex();
78 currentReader.setPosition(JournalSegmentDescriptor.BYTES);
84 * Rewinds the journal to the given index.
86 private void rewind(final long index) {
87 if (currentSegment.firstIndex() >= index) {
88 final var segment = journal.segment(index - 1);
89 if (segment != null) {
90 currentReader.close();
91 currentSegment = segment;
92 currentReader = currentSegment.createReader();
95 resetCurrentReader(index);
98 private void forwardTo(final long index) {
99 while (nextIndex < index && tryAdvance(nextIndex) != null) {
100 // No-op -- nextIndex value is updated in tryAdvance()
105 public final <T> T tryNext(final FromByteBufMapper<T> mapper) {
106 final var index = nextIndex;
107 final var bytes = tryAdvance(index);
108 return bytes == null ? null : mapper.bytesToObject(index, bytes);
112 * Attempt to read the next entry. {@code index} here is really {@code nextIndex} passed by callers, which already
113 * check it for invariants. If non-null is returned, {@code nextIndex} has already been set to {@code index + 1}.
116 * This method is shared between 'all entries' and 'committed entries only' variants. The distinction is made by
117 * an additional check in {@link SegmentedCommitsByteBufReader#tryAdvance(long)}.
119 * @param index next index
120 * @return Entry bytes, or {@code null}
122 ByteBuf tryAdvance(final long index) {
123 var buf = currentReader.readBytes();
125 final var nextSegment = journal.tryNextSegment(currentSegment.firstIndex());
126 if (nextSegment == null || nextSegment.firstIndex() != index) {
129 currentReader.close();
130 currentSegment = nextSegment;
131 currentReader = currentSegment.createReader();
132 buf = currentReader.readBytes();
137 nextIndex = index + 1;
142 public final void close() {
143 currentReader.close();
144 journal.closeReader(this);