Move entry tracking to SegmentedJournalReader 98/110998/10
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 23 Mar 2024 13:51:23 +0000 (14:51 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 25 Mar 2024 07:44:43 +0000 (08:44 +0100)
We have duplicated state tracking just because JournalSegmentReader used
to provide the same API as SegmentedJournalReader.

Rehost tracking the next index and current entry into
SegmentedJournalReader, making it more obvious as to what is going on.

This allows Indexed to enforce the entry being non-null, as we do not
have to use a fake entry anymore.

JIRA: CONTROLLER-2109
Change-Id: I9ebd50fde94d695bb62315bba1b6adb1235b37f1
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/Indexed.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java

index 64ae58c484efee459c1c672400dc198632927d24..bf02a90fe0d4b248177d1c5f35d28dac754f32de 100644 (file)
@@ -16,7 +16,6 @@
  */
 package io.atomix.storage.journal;
 
-import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
@@ -37,12 +36,10 @@ final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
       FileChannel channel,
       JournalSegment<E> segment,
       int maxEntrySize,
-      JournalIndex index,
       JournalSerdes namespace) {
-    super(segment, maxEntrySize, index, namespace);
+    super(segment, maxEntrySize, namespace);
     this.channel = channel;
     this.memory = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2);
-    reset();
   }
 
   @Override
index 02e84d2b027bf46f1f81ad8300466138dbeb3bad..5bf7e6f4545f63aa9e823db05996782ac1d85094 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNullByDefault;
 
 /**
  * Indexed journal entry.
@@ -26,10 +29,14 @@ import com.google.common.base.MoreObjects;
  * @param entry the indexed entry
  * @param size the serialized entry size
  */
-// FIXME: add @NonNullByDefault and enforce non-null entry once we can say that entries cannot be null
 // FIXME: it seems 'index' has to be non-zero, we should enforce that if that really is the case
 // FIXME: it seems 'size' has not be non-zero, we should enforce that if that really is the case
+@NonNullByDefault
 public record Indexed<E>(long index, E entry, int size) {
+    public Indexed {
+        requireNonNull(entry);
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("index", index).add("entry", entry).toString();
index 3d5ab7ade9883d07712be09c978cc2db40839168..d21d9051b6e6c97fc71467cf186df4f48ea0e353 100644 (file)
@@ -18,6 +18,7 @@ package io.atomix.storage.journal;
 
 import com.google.common.base.MoreObjects;
 import io.atomix.storage.journal.index.JournalIndex;
+import io.atomix.storage.journal.index.Position;
 import io.atomix.storage.journal.index.SparseJournalIndex;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
@@ -26,6 +27,7 @@ import java.nio.file.StandardOpenOption;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jdt.annotation.Nullable;
 
 /**
  * Log segment.
@@ -37,7 +39,7 @@ final class JournalSegment<E> implements AutoCloseable {
   private final JournalSegmentDescriptor descriptor;
   private final StorageLevel storageLevel;
   private final int maxEntrySize;
-  private final JournalIndex index;
+  private final JournalIndex journalIndex;
   private final JournalSerdes namespace;
   private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
   private final AtomicInteger references = new AtomicInteger();
@@ -58,7 +60,7 @@ final class JournalSegment<E> implements AutoCloseable {
     this.storageLevel = storageLevel;
     this.maxEntrySize = maxEntrySize;
     this.namespace = namespace;
-    index = new SparseJournalIndex(indexDensity);
+    journalIndex = new SparseJournalIndex(indexDensity);
     try {
       channel = FileChannel.open(file.file().toPath(),
         StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
@@ -66,8 +68,9 @@ final class JournalSegment<E> implements AutoCloseable {
       throw new StorageException(e);
     }
     writer = switch (storageLevel) {
-        case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace);
-        case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace).toFileChannel();
+        case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
+        case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
+            .toFileChannel();
     };
   }
 
@@ -120,6 +123,16 @@ final class JournalSegment<E> implements AutoCloseable {
     return descriptor;
   }
 
+  /**
+   * Looks up the position of the given index.
+   *
+   * @param index the index to lookup
+   * @return the position of the given index or a lesser index, or {@code null}
+   */
+  @Nullable Position lookup(long index) {
+    return journalIndex.lookup(index);
+  }
+
   /**
    * Acquires a reference to the log segment.
    */
@@ -173,8 +186,9 @@ final class JournalSegment<E> implements AutoCloseable {
 
     final var buffer = writer.buffer();
     final var reader = buffer == null
-      ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace)
-        : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace);
+      ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace)
+        : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace);
+    reader.setPosition(JournalSegmentDescriptor.BYTES);
     readers.add(reader);
     return reader;
   }
index 36cd6ef5c3dad6c14f6db3da68296f43bd3754a4..44c990f5a3b27884ef7e224211a418bbda3eb05f 100644 (file)
@@ -17,100 +17,17 @@ package io.atomix.storage.journal;
 
 import static java.util.Objects.requireNonNull;
 
-import io.atomix.storage.journal.index.JournalIndex;
-import io.atomix.storage.journal.index.Position;
 import org.eclipse.jdt.annotation.Nullable;
 
 abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader, MappedJournalSegmentReader {
     final int maxEntrySize;
-    private final JournalIndex index;
     final JournalSerdes namespace;
-    private final long firstIndex;
     private final JournalSegment<E> segment;
 
-    private Indexed<E> currentEntry;
-    private Indexed<E> nextEntry;
-
-    JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
-            final JournalSerdes namespace) {
+    JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalSerdes namespace) {
         this.segment = requireNonNull(segment);
         this.maxEntrySize = maxEntrySize;
-        this.index = requireNonNull(index);
         this.namespace = requireNonNull(namespace);
-        firstIndex = segment.index();
-    }
-
-    /**
-     * Returns the last read entry.
-     *
-     * @return The last read entry.
-     */
-    final Indexed<E> getCurrentEntry() {
-        return currentEntry;
-    }
-
-    /**
-     * Returns the next reader index.
-     *
-     * @return The next reader index.
-     */
-    final long getNextIndex() {
-        return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
-    }
-
-    /**
-     * Returns the next entry in the reader.
-     *
-     * @return The next entry in the reader, or {@code null}
-     */
-    final @Nullable Indexed<E> tryNext() {
-        if (nextEntry == null) {
-            nextEntry = readNext();
-        }
-        if (nextEntry == null) {
-            return null;
-        }
-
-        // Set the current entry to the next entry.
-        currentEntry = nextEntry;
-
-        // Reset the next entry to null.
-        nextEntry = null;
-
-        // Read the next entry in the segment.
-        nextEntry = readNext();
-
-        // Return the current entry.
-        return currentEntry;
-    }
-
-    /**
-     * Resets the reader to the start of the segment.
-     */
-    final void reset() {
-        currentEntry = null;
-        nextEntry = null;
-        setPosition(JournalSegmentDescriptor.BYTES);
-        nextEntry = readNext();
-    }
-
-    /**
-     * Resets the reader to the given index.
-     *
-     * @param index The index to which to reset the reader.
-     */
-    final void reset(final long index) {
-        reset();
-        Position position = this.index.lookup(index - 1);
-        if (position != null) {
-            // FIXME: why do we need a 'null'-based entry here?
-            currentEntry = new Indexed<>(position.index() - 1, null, 0);
-            setPosition(position.position());
-            nextEntry = readNext();
-        }
-        while (getNextIndex() < index && tryNext() != null) {
-            // Nothing else
-        }
     }
 
     /**
@@ -134,9 +51,4 @@ abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader,
      * @return The entry, or {@code null}
      */
     abstract @Nullable Indexed<E> readEntry(long index);
-
-    private @Nullable Indexed<E> readNext() {
-        // Compute the index of the next entry in the segment.
-        return readEntry(getNextIndex());
-    }
 }
index 44008774661a0047e16328c6ec53e79efefc5eb2..bbf100740a50d25af843dad80250c153f6013163 100644 (file)
@@ -16,7 +16,6 @@
  */
 package io.atomix.storage.journal;
 
-import io.atomix.storage.journal.index.JournalIndex;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
@@ -33,11 +32,9 @@ final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
       ByteBuffer buffer,
       JournalSegment<E> segment,
       int maxEntrySize,
-      JournalIndex index,
       JournalSerdes namespace) {
-    super(segment, maxEntrySize, index, namespace);
+    super(segment, maxEntrySize, namespace);
     this.buffer = buffer.slice();
-    reset();
   }
 
   @Override
index 50270442e79226e46334c1abcff9e335efe7384f..de57d520e2896eb164707b1504eecdc83dc30844 100644 (file)
@@ -22,125 +22,127 @@ import static java.util.Objects.requireNonNull;
  * A {@link JournalReader} traversing all entries.
  */
 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
-  final SegmentedJournal<E> journal;
-  private JournalSegment<E> currentSegment;
-  private Indexed<E> previousEntry;
-  private JournalSegmentReader<E> currentReader;
-
-  SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
-    this.journal = requireNonNull(journal);
-    currentSegment = requireNonNull(segment);
-    currentReader = segment.createReader();
-  }
-
-  @Override
-  public final long getFirstIndex() {
-    return journal.getFirstSegment().index();
-  }
-
-  @Override
-  public final long getCurrentIndex() {
-    final var currentEntry = currentReader.getCurrentEntry();
-    if (currentEntry != null) {
-      final long currentIndex = currentEntry.index();
-      if (currentIndex != 0) {
-        return currentIndex;
-      }
+    final SegmentedJournal<E> journal;
+
+    private JournalSegment<E> currentSegment;
+    private JournalSegmentReader<E> currentReader;
+    private Indexed<E> currentEntry;
+    private long nextIndex;
+
+    SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
+        this.journal = requireNonNull(journal);
+        currentSegment = requireNonNull(segment);
+        currentReader = segment.createReader();
+        nextIndex = currentSegment.index();
+        currentEntry = null;
     }
-    return previousEntry != null ? previousEntry.index() : 0;
-  }
-
-  @Override
-  public final Indexed<E> getCurrentEntry() {
-    // If previousEntry was the last in the previous segment, we may have moved currentReader to the next segment.
-    // That segment may be empty, though, in which case we need to report the previousEntry.
-    final Indexed<E> currentEntry;
-    return (currentEntry = currentReader.getCurrentEntry()) != null ? currentEntry : previousEntry;
-  }
-
-  @Override
-  public final long getNextIndex() {
-    return currentReader.getNextIndex();
-  }
-
-  @Override
-  public final void reset() {
-    previousEntry = null;
-    currentReader.close();
-
-    currentSegment = journal.getFirstSegment();
-    currentReader = currentSegment.createReader();
-  }
-
-  @Override
-  public final void reset(long index) {
-    // If the current segment is not open, it has been replaced. Reset the segments.
-    if (!currentSegment.isOpen()) {
-      reset();
+
+    @Override
+    public final long getFirstIndex() {
+        return journal.getFirstSegment().index();
     }
 
-    final var nextIndex = currentReader.getNextIndex();
-    if (index < nextIndex) {
-      rewind(index);
-    } else if (index > nextIndex) {
-      forward(index);
-    } else {
-      currentReader.reset(index);
+    @Override
+    public final long getCurrentIndex() {
+        return currentEntry != null ? currentEntry.index() : 0;
     }
-  }
-
-  /**
-   * Rewinds the journal to the given index.
-   */
-  private void rewind(long index) {
-    if (currentSegment.index() >= index) {
-      JournalSegment<E> segment = journal.getSegment(index - 1);
-      if (segment != null) {
-        currentReader.close();
 
-        currentSegment = segment;
-        currentReader = currentSegment.createReader();
-      }
+    @Override
+    public final Indexed<E> getCurrentEntry() {
+        return currentEntry;
     }
 
-    currentReader.reset(index);
-    previousEntry = currentReader.getCurrentEntry();
-  }
+    @Override
+    public final long getNextIndex() {
+        return nextIndex;
+    }
+
+    @Override
+    public final void reset() {
+        currentReader.close();
 
-  /**
-   * Fast forwards the journal to the given index.
-   */
-  private void forward(long index) {
-    while (getNextIndex() < index && tryNext() != null) {
-      // Nothing else
+        currentSegment = journal.getFirstSegment();
+        currentReader = currentSegment.createReader();
+        nextIndex = currentSegment.index();
+        currentEntry = null;
     }
-  }
-
-  @Override
-  public Indexed<E> tryNext() {
-    final var current = currentReader.getCurrentEntry();
-    final var next = currentReader.tryNext();
-    if (next != null) {
-      previousEntry = current;
-      return next;
+
+    @Override
+    public final void reset(final long index) {
+        // If the current segment is not open, it has been replaced. Reset the segments.
+        if (!currentSegment.isOpen()) {
+            reset();
+        }
+
+        if (index < nextIndex) {
+            rewind(index);
+        } else if (index > nextIndex) {
+            while (index > nextIndex && tryNext() != null) {
+                // Nothing else
+            }
+        } else {
+            resetCurrentReader(index);
+        }
     }
 
-    final var nextSegment = journal.getNextSegment(currentSegment.index());
-    if (nextSegment == null || nextSegment.index() != getNextIndex()) {
-      return null;
+    private void resetCurrentReader(final long index) {
+        final var position = currentSegment.lookup(index - 1);
+        if (position != null) {
+            nextIndex = position.index();
+            currentReader.setPosition(position.position());
+        } else {
+            nextIndex = currentSegment.index();
+            currentReader.setPosition(JournalSegmentDescriptor.BYTES);
+        }
+        while (nextIndex < index && tryNext() != null) {
+            // Nothing else
+        }
     }
 
-    previousEntry = currentReader.getCurrentEntry();
-    currentReader.close();
+    /**
+     * Rewinds the journal to the given index.
+     */
+    private void rewind(final long index) {
+        if (currentSegment.index() >= index) {
+            JournalSegment<E> segment = journal.getSegment(index - 1);
+            if (segment != null) {
+                currentReader.close();
+
+                currentSegment = segment;
+                currentReader = currentSegment.createReader();
+            }
+        }
+
+        resetCurrentReader(index);
+    }
 
-    currentSegment = nextSegment;
-    currentReader = currentSegment.createReader();
-    return currentReader.tryNext();
-  }
+    @Override
+    public Indexed<E> tryNext() {
+        var next = currentReader.readEntry(nextIndex);
+        if (next == null) {
+            final var nextSegment = journal.getNextSegment(currentSegment.index());
+            if (nextSegment == null || nextSegment.index() != nextIndex) {
+                return null;
+            }
+
+            currentReader.close();
+
+            currentSegment = nextSegment;
+            currentReader = currentSegment.createReader();
+            next = currentReader.readEntry(nextIndex);
+            if (next == null) {
+                return null;
+            }
+        }
+
+        nextIndex = nextIndex + 1;
+        currentEntry = next;
+        return next;
+    }
 
-  @Override
-  public final void close() {
-    currentReader.close();
-    journal.closeReader(this);
-  }
+    @Override
+    public final void close() {
+        currentReader.close();
+        journal.closeReader(this);
+    }
 }