Rename ByteBufJournal
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedByteBufReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import io.netty.buffer.ByteBuf;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.controller.raft.journal.EntryReader;
24 import org.opendaylight.controller.raft.journal.FromByteBufMapper;
25
26 /**
27  * A {@link EntryReader} implementation.
28  */
29 sealed class SegmentedByteBufReader implements EntryReader permits SegmentedCommitsByteBufReader {
30     final @NonNull SegmentedByteBufJournal journal;
31
32     private JournalSegment currentSegment;
33     private JournalSegmentReader currentReader;
34     private long nextIndex;
35
36     SegmentedByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
37         this.journal = requireNonNull(journal);
38         currentSegment = requireNonNull(segment);
39         currentReader = segment.createReader();
40         nextIndex = currentSegment.firstIndex();
41     }
42
43     @Override
44     public final long nextIndex() {
45         return nextIndex;
46     }
47
48     @Override
49     public final void reset() {
50         currentReader.close();
51         currentSegment = journal.firstSegment();
52         currentReader = currentSegment.createReader();
53         nextIndex = currentSegment.firstIndex();
54     }
55
56     @Override
57     public final void reset(final long index) {
58         // If the current segment is not open, it has been replaced. Reset the segments.
59         if (!currentSegment.isOpen()) {
60             reset();
61         }
62         if (index < nextIndex) {
63             rewind(index);
64         } else if (index > nextIndex) {
65             forwardTo(index);
66         } else {
67             resetCurrentReader(index);
68         }
69     }
70
71     private void resetCurrentReader(final long index) {
72         final var position = currentSegment.lookup(index - 1);
73         if (position != null) {
74             nextIndex = position.index();
75             currentReader.setPosition(position.position());
76         } else {
77             nextIndex = currentSegment.firstIndex();
78             currentReader.setPosition(JournalSegmentDescriptor.BYTES);
79         }
80         forwardTo(index);
81     }
82
83     /**
84      * Rewinds the journal to the given index.
85      */
86     private void rewind(final long index) {
87         if (currentSegment.firstIndex() >= index) {
88             final var segment = journal.segment(index - 1);
89             if (segment != null) {
90                 currentReader.close();
91                 currentSegment = segment;
92                 currentReader = currentSegment.createReader();
93             }
94         }
95         resetCurrentReader(index);
96     }
97
98     private void forwardTo(final long index) {
99         while (nextIndex < index && tryAdvance(nextIndex) != null) {
100             // No-op -- nextIndex value is updated in tryAdvance()
101         }
102     }
103
104     @Override
105     public final <T> T tryNext(final FromByteBufMapper<T> mapper) {
106         final var index = nextIndex;
107         final var bytes = tryAdvance(index);
108         return bytes == null ? null : mapper.bytesToObject(index, bytes);
109     }
110
111     /**
112      * Attempt to read the next entry. {@code index} here is really {@code nextIndex} passed by callers, which already
113      * check it for invariants. If non-null is returned, {@code nextIndex} has already been set to {@code index + 1}.
114      *
115      * <p>
116      * This method is shared between 'all entries' and 'committed entries only' variants. The distinction is made by
117      * an additional check in {@link SegmentedCommitsByteBufReader#tryAdvance(long)}.
118      *
119      * @param index next index
120      * @return Entry bytes, or {@code null}
121      */
122     ByteBuf tryAdvance(final long index) {
123         var buf = currentReader.readBytes();
124         if (buf == null) {
125             final var nextSegment = journal.tryNextSegment(currentSegment.firstIndex());
126             if (nextSegment == null || nextSegment.firstIndex() != index) {
127                 return null;
128             }
129             currentReader.close();
130             currentSegment = nextSegment;
131             currentReader = currentSegment.createReader();
132             buf = currentReader.readBytes();
133             if (buf == null) {
134                 return null;
135             }
136         }
137         nextIndex = index + 1;
138         return buf;
139     }
140
141     @Override
142     public final void close() {
143         currentReader.close();
144         journal.closeReader(this);
145     }
146 }