2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package io.atomix.storage.journal;
10 import static java.util.Objects.requireNonNull;
12 import io.atomix.storage.journal.index.JournalIndex;
13 import io.atomix.storage.journal.index.Position;
14 import java.util.NoSuchElementException;
15 import org.eclipse.jdt.annotation.Nullable;
17 abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
18 permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
19 final int maxEntrySize;
20 private final JournalIndex index;
21 final JournalSerdes namespace;
22 private final long firstIndex;
23 private final JournalSegment<E> segment;
25 private Indexed<E> currentEntry;
26 private Indexed<E> nextEntry;
28 JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
29 final JournalSerdes namespace) {
30 this.segment = requireNonNull(segment);
31 this.maxEntrySize = maxEntrySize;
32 this.index = requireNonNull(index);
33 this.namespace = requireNonNull(namespace);
34 firstIndex = segment.index();
38 public final long getFirstIndex() {
43 public final long getCurrentIndex() {
44 return currentEntry != null ? currentEntry.index() : 0;
48 public final Indexed<E> getCurrentEntry() {
53 public final long getNextIndex() {
54 return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
58 public final boolean hasNext() {
59 return nextEntry != null || (nextEntry = readNext()) != null;
63 public final Indexed<E> next() {
65 throw new NoSuchElementException();
68 // Set the current entry to the next entry.
69 currentEntry = nextEntry;
71 // Reset the next entry to null.
74 // Read the next entry in the segment.
75 nextEntry = readNext();
77 // Return the current entry.
82 public final void reset() {
85 setPosition(JournalSegmentDescriptor.BYTES);
86 nextEntry = readNext();
90 public final void reset(final long index) {
92 Position position = this.index.lookup(index - 1);
93 if (position != null) {
94 currentEntry = new Indexed<>(position.index() - 1, null, 0);
95 setPosition(position.position());
96 nextEntry = readNext();
98 while (getNextIndex() < index && hasNext()) {
104 public final void close() {
105 segment.closeReader(this);
109 * Set the file position.
111 * @param position new position
113 abstract void setPosition(int position);
116 * Reads the next entry in the segment.
118 * @return Next entry, or {@code null}
120 abstract @Nullable Indexed<E> readNext();