Eliminate MappableJournalSegmentReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentReader.java
index 23ef1f320b2bc299aa56227f587378a6be7246e5..2dddd9827633cb92b15cd30d1e3be245557c8fb0 100644 (file)
@@ -10,20 +10,28 @@ package io.atomix.storage.journal;
 import static java.util.Objects.requireNonNull;
 
 import io.atomix.storage.journal.index.JournalIndex;
+import io.atomix.storage.journal.index.Position;
+import java.util.NoSuchElementException;
+import org.eclipse.jdt.annotation.Nullable;
 
 abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
         permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
     final int maxEntrySize;
-    final JournalIndex index;
+    private final JournalIndex index;
     final JournalSerdes namespace;
-    final long firstIndex;
+    private final long firstIndex;
+    private final JournalSegment<E> segment;
+
+    private Indexed<E> currentEntry;
+    private Indexed<E> nextEntry;
 
     JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
             final JournalSerdes namespace) {
+        this.segment = requireNonNull(segment);
         this.maxEntrySize = maxEntrySize;
         this.index = requireNonNull(index);
         this.namespace = requireNonNull(namespace);
-        this.firstIndex = segment.index();
+        firstIndex = segment.index();
     }
 
     @Override
@@ -31,8 +39,83 @@ abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
         return firstIndex;
     }
 
+    @Override
+    public final long getCurrentIndex() {
+        return currentEntry != null ? currentEntry.index() : 0;
+    }
+
+    @Override
+    public final Indexed<E> getCurrentEntry() {
+        return currentEntry;
+    }
+
+    @Override
+    public final long getNextIndex() {
+        return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
+    }
+
+    @Override
+    public final boolean hasNext() {
+        return nextEntry != null || (nextEntry = readNext()) != null;
+    }
+
+    @Override
+    public final Indexed<E> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        // Set the current entry to the next entry.
+        currentEntry = nextEntry;
+
+        // Reset the next entry to null.
+        nextEntry = null;
+
+        // Read the next entry in the segment.
+        nextEntry = readNext();
+
+        // Return the current entry.
+        return currentEntry;
+    }
+
+    @Override
+    public final void reset() {
+        currentEntry = null;
+        nextEntry = null;
+        setPosition(JournalSegmentDescriptor.BYTES);
+        nextEntry = readNext();
+    }
+
+    @Override
+    public final void reset(final long index) {
+        reset();
+        Position position = this.index.lookup(index - 1);
+        if (position != null) {
+            currentEntry = new Indexed<>(position.index() - 1, null, 0);
+            setPosition(position.position());
+            nextEntry = readNext();
+        }
+        while (getNextIndex() < index && hasNext()) {
+            next();
+        }
+    }
+
     @Override
     public final void close() {
-        // FIXME: CONTROLLER-2098: remove this method
+        segment.closeReader(this);
     }
+
+    /**
+     * Set the file position.
+     *
+     * @param position new position
+     */
+    abstract void setPosition(int position);
+
+    /**
+     * Reads the next entry in the segment.
+     *
+     * @return Next entry, or {@code null}
+     */
+    abstract @Nullable Indexed<E> readNext();
 }