Improve segmented journal actor metrics
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 /**
22  * A {@link JournalReader} traversing all entries.
23  */
24 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
25     final SegmentedJournal<E> journal;
26
27     private JournalSegment<E> currentSegment;
28     private JournalSegmentReader<E> currentReader;
29     private Indexed<E> currentEntry;
30     private long nextIndex;
31
32     SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
33         this.journal = requireNonNull(journal);
34         currentSegment = requireNonNull(segment);
35         currentReader = segment.createReader();
36         nextIndex = currentSegment.firstIndex();
37         currentEntry = null;
38     }
39
40     @Override
41     public final long getFirstIndex() {
42         return journal.getFirstSegment().firstIndex();
43     }
44
45     @Override
46     public final Indexed<E> getCurrentEntry() {
47         return currentEntry;
48     }
49
50     @Override
51     public final long getNextIndex() {
52         return nextIndex;
53     }
54
55     @Override
56     public final void reset() {
57         currentReader.close();
58
59         currentSegment = journal.getFirstSegment();
60         currentReader = currentSegment.createReader();
61         nextIndex = currentSegment.firstIndex();
62         currentEntry = null;
63     }
64
65     @Override
66     public final void reset(final long index) {
67         // If the current segment is not open, it has been replaced. Reset the segments.
68         if (!currentSegment.isOpen()) {
69             reset();
70         }
71
72         if (index < nextIndex) {
73             rewind(index);
74         } else if (index > nextIndex) {
75             while (index > nextIndex && tryNext() != null) {
76                 // Nothing else
77             }
78         } else {
79             resetCurrentReader(index);
80         }
81     }
82
83     private void resetCurrentReader(final long index) {
84         final var position = currentSegment.lookup(index - 1);
85         if (position != null) {
86             nextIndex = position.index();
87             currentReader.setPosition(position.position());
88         } else {
89             nextIndex = currentSegment.firstIndex();
90             currentReader.setPosition(JournalSegmentDescriptor.BYTES);
91         }
92         while (nextIndex < index && tryNext() != null) {
93             // Nothing else
94         }
95     }
96
97     /**
98      * Rewinds the journal to the given index.
99      */
100     private void rewind(final long index) {
101         if (currentSegment.firstIndex() >= index) {
102             JournalSegment<E> segment = journal.getSegment(index - 1);
103             if (segment != null) {
104                 currentReader.close();
105
106                 currentSegment = segment;
107                 currentReader = currentSegment.createReader();
108             }
109         }
110
111         resetCurrentReader(index);
112     }
113
114     @Override
115     public Indexed<E> tryNext() {
116         var next = currentReader.readEntry(nextIndex);
117         if (next == null) {
118             final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
119             if (nextSegment == null || nextSegment.firstIndex() != nextIndex) {
120                 return null;
121             }
122
123             currentReader.close();
124
125             currentSegment = nextSegment;
126             currentReader = currentSegment.createReader();
127             next = currentReader.readEntry(nextIndex);
128             if (next == null) {
129                 return null;
130             }
131         }
132
133         nextIndex = nextIndex + 1;
134         currentEntry = next;
135         return next;
136     }
137
138     @Override
139     public final void close() {
140         currentReader.close();
141         journal.closeReader(this);
142     }
143 }