Separate byte-level atomic-storage access
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedByteBufReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import io.netty.buffer.ByteBuf;
22 import org.eclipse.jdt.annotation.NonNull;
23
24 /**
25  * A {@link ByteBufReader} implementation.
26  */
27 sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCommitsByteBufReader {
28     final @NonNull SegmentedByteBufJournal journal;
29
30     private JournalSegment currentSegment;
31     private JournalSegmentReader currentReader;
32     private long nextIndex;
33
34     SegmentedByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
35         this.journal = requireNonNull(journal);
36         currentSegment = requireNonNull(segment);
37         currentReader = segment.createReader();
38         nextIndex = currentSegment.firstIndex();
39     }
40
41     @Override
42     public final long firstIndex() {
43         return journal.firstSegment().firstIndex();
44     }
45
46     @Override
47     public final long nextIndex() {
48         return nextIndex;
49     }
50
51     @Override
52     public final void reset() {
53         currentReader.close();
54         currentSegment = journal.firstSegment();
55         currentReader = currentSegment.createReader();
56         nextIndex = currentSegment.firstIndex();
57     }
58
59     @Override
60     public final void reset(final long index) {
61         // If the current segment is not open, it has been replaced. Reset the segments.
62         if (!currentSegment.isOpen()) {
63             reset();
64         }
65         if (index < nextIndex) {
66             rewind(index);
67         } else if (index > nextIndex) {
68             forwardTo(index);
69         } else {
70             resetCurrentReader(index);
71         }
72     }
73
74     private void resetCurrentReader(final long index) {
75         final var position = currentSegment.lookup(index - 1);
76         if (position != null) {
77             nextIndex = position.index();
78             currentReader.setPosition(position.position());
79         } else {
80             nextIndex = currentSegment.firstIndex();
81             currentReader.setPosition(JournalSegmentDescriptor.BYTES);
82         }
83         forwardTo(index);
84     }
85
86     /**
87      * Rewinds the journal to the given index.
88      */
89     private void rewind(final long index) {
90         if (currentSegment.firstIndex() >= index) {
91             final var segment = journal.segment(index - 1);
92             if (segment != null) {
93                 currentReader.close();
94                 currentSegment = segment;
95                 currentReader = currentSegment.createReader();
96             }
97         }
98         resetCurrentReader(index);
99     }
100
101     private void forwardTo(final long index) {
102         while (nextIndex < index && tryAdvance(nextIndex) != null) {
103             // No-op -- nextIndex value is updated in tryAdvance()
104         }
105     }
106
107     @Override
108     public final <T> T tryNext(final EntryMapper<T> entryMapper) {
109         final var index = nextIndex;
110         final var bytes = tryAdvance(index);
111         return bytes == null ? null : entryMapper.mapEntry(index, bytes);
112     }
113
114     /**
115      * Attempt to read the next entry. {@code index} here is really {@code nextIndex} passed by callers, which already
116      * check it for invariants. If non-null is returned, {@code nextIndex} has already been set to {@code index + 1}.
117      *
118      * <p>
119      * This method is shared between 'all entries' and 'committed entries only' variants. The distinction is made by
120      * an additional check in {@link SegmentedCommitsByteBufReader#tryAdvance(long)}.
121      *
122      * @param index next index
123      * @return Entry bytes, or {@code null}
124      */
125     ByteBuf tryAdvance(final long index) {
126         var buf = currentReader.readBytes();
127         if (buf == null) {
128             final var nextSegment = journal.nextSegment(currentSegment.firstIndex());
129             if (nextSegment == null || nextSegment.firstIndex() != index) {
130                 return null;
131             }
132             currentReader.close();
133             currentSegment = nextSegment;
134             currentReader = currentSegment.createReader();
135             buf = currentReader.readBytes();
136             if (buf == null) {
137                 return null;
138             }
139         }
140         nextIndex = index + 1;
141         return buf;
142     }
143
144     @Override
145     public final void close() {
146         currentReader.close();
147         journal.closeReader(this);
148     }
149 }