Bump versions 9.0.4-SNAPSHOT
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalReader.java
index de57d520e2896eb164707b1504eecdc83dc30844..4021518ef5273eba5486b36b5f0142f9bf040f07 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
- * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,131 +18,49 @@ package io.atomix.storage.journal;
 
 import static java.util.Objects.requireNonNull;
 
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.raft.journal.EntryReader;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+
 /**
- * A {@link JournalReader} traversing all entries.
+ * A {@link JournalReader} backed by a {@link EntryReader}.
  */
-sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
-    final SegmentedJournal<E> journal;
-
-    private JournalSegment<E> currentSegment;
-    private JournalSegmentReader<E> currentReader;
-    private Indexed<E> currentEntry;
-    private long nextIndex;
-
-    SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
-        this.journal = requireNonNull(journal);
-        currentSegment = requireNonNull(segment);
-        currentReader = segment.createReader();
-        nextIndex = currentSegment.index();
-        currentEntry = null;
-    }
-
-    @Override
-    public final long getFirstIndex() {
-        return journal.getFirstSegment().index();
+@NonNullByDefault
+final class SegmentedJournalReader<E> implements JournalReader<E> {
+    private final FromByteBufMapper<E> mapper;
+    private final EntryReader reader;
+
+    SegmentedJournalReader(final EntryReader reader, final FromByteBufMapper<E> mapper) {
+        this.reader = requireNonNull(reader);
+        this.mapper = requireNonNull(mapper);
     }
 
     @Override
-    public final long getCurrentIndex() {
-        return currentEntry != null ? currentEntry.index() : 0;
+    public long getNextIndex() {
+        return reader.nextIndex();
     }
 
     @Override
-    public final Indexed<E> getCurrentEntry() {
-        return currentEntry;
+    public void reset() {
+        reader.reset();
     }
 
     @Override
-    public final long getNextIndex() {
-        return nextIndex;
+    public void reset(final long index) {
+        reader.reset(index);
     }
 
     @Override
-    public final void reset() {
-        currentReader.close();
-
-        currentSegment = journal.getFirstSegment();
-        currentReader = currentSegment.createReader();
-        nextIndex = currentSegment.index();
-        currentEntry = null;
-    }
-
-    @Override
-    public final void reset(final long index) {
-        // If the current segment is not open, it has been replaced. Reset the segments.
-        if (!currentSegment.isOpen()) {
-            reset();
-        }
-
-        if (index < nextIndex) {
-            rewind(index);
-        } else if (index > nextIndex) {
-            while (index > nextIndex && tryNext() != null) {
-                // Nothing else
-            }
-        } else {
-            resetCurrentReader(index);
-        }
-    }
-
-    private void resetCurrentReader(final long index) {
-        final var position = currentSegment.lookup(index - 1);
-        if (position != null) {
-            nextIndex = position.index();
-            currentReader.setPosition(position.position());
-        } else {
-            nextIndex = currentSegment.index();
-            currentReader.setPosition(JournalSegmentDescriptor.BYTES);
-        }
-        while (nextIndex < index && tryNext() != null) {
-            // Nothing else
-        }
-    }
-
-    /**
-     * Rewinds the journal to the given index.
-     */
-    private void rewind(final long index) {
-        if (currentSegment.index() >= index) {
-            JournalSegment<E> segment = journal.getSegment(index - 1);
-            if (segment != null) {
-                currentReader.close();
-
-                currentSegment = segment;
-                currentReader = currentSegment.createReader();
-            }
-        }
-
-        resetCurrentReader(index);
-    }
-
-    @Override
-    public Indexed<E> tryNext() {
-        var next = currentReader.readEntry(nextIndex);
-        if (next == null) {
-            final var nextSegment = journal.getNextSegment(currentSegment.index());
-            if (nextSegment == null || nextSegment.index() != nextIndex) {
-                return null;
-            }
-
-            currentReader.close();
-
-            currentSegment = nextSegment;
-            currentReader = currentSegment.createReader();
-            next = currentReader.readEntry(nextIndex);
-            if (next == null) {
-                return null;
-            }
-        }
-
-        nextIndex = nextIndex + 1;
-        currentEntry = next;
-        return next;
+    public <T> @Nullable T tryNext(final EntryMapper<E, T> entryMapper) {
+        return reader.tryNext((index, buf) -> {
+            final var size = buf.readableBytes();
+            return requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(index, buf), size));
+        });
     }
 
     @Override
-    public final void close() {
-        currentReader.close();
-        journal.closeReader(this);
+    public void close() {
+        reader.close();
     }
 }