Eliminate MappableJournalSegmentReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentReader.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package io.atomix.storage.journal;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.atomix.storage.journal.index.JournalIndex;
13 import io.atomix.storage.journal.index.Position;
14 import java.util.NoSuchElementException;
15 import org.eclipse.jdt.annotation.Nullable;
16
17 abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
18         permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
19     final int maxEntrySize;
20     private final JournalIndex index;
21     final JournalSerdes namespace;
22     private final long firstIndex;
23     private final JournalSegment<E> segment;
24
25     private Indexed<E> currentEntry;
26     private Indexed<E> nextEntry;
27
28     JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
29             final JournalSerdes namespace) {
30         this.segment = requireNonNull(segment);
31         this.maxEntrySize = maxEntrySize;
32         this.index = requireNonNull(index);
33         this.namespace = requireNonNull(namespace);
34         firstIndex = segment.index();
35     }
36
37     @Override
38     public final long getFirstIndex() {
39         return firstIndex;
40     }
41
42     @Override
43     public final long getCurrentIndex() {
44         return currentEntry != null ? currentEntry.index() : 0;
45     }
46
47     @Override
48     public final Indexed<E> getCurrentEntry() {
49         return currentEntry;
50     }
51
52     @Override
53     public final long getNextIndex() {
54         return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
55     }
56
57     @Override
58     public final boolean hasNext() {
59         return nextEntry != null || (nextEntry = readNext()) != null;
60     }
61
62     @Override
63     public final Indexed<E> next() {
64         if (!hasNext()) {
65             throw new NoSuchElementException();
66         }
67
68         // Set the current entry to the next entry.
69         currentEntry = nextEntry;
70
71         // Reset the next entry to null.
72         nextEntry = null;
73
74         // Read the next entry in the segment.
75         nextEntry = readNext();
76
77         // Return the current entry.
78         return currentEntry;
79     }
80
81     @Override
82     public final void reset() {
83         currentEntry = null;
84         nextEntry = null;
85         setPosition(JournalSegmentDescriptor.BYTES);
86         nextEntry = readNext();
87     }
88
89     @Override
90     public final void reset(final long index) {
91         reset();
92         Position position = this.index.lookup(index - 1);
93         if (position != null) {
94             currentEntry = new Indexed<>(position.index() - 1, null, 0);
95             setPosition(position.position());
96             nextEntry = readNext();
97         }
98         while (getNextIndex() < index && hasNext()) {
99             next();
100         }
101     }
102
103     @Override
104     public final void close() {
105         segment.closeReader(this);
106     }
107
108     /**
109      * Set the file position.
110      *
111      * @param position new position
112      */
113     abstract void setPosition(int position);
114
115     /**
116      * Reads the next entry in the segment.
117      *
118      * @return Next entry, or {@code null}
119      */
120     abstract @Nullable Indexed<E> readNext();
121 }