Optimize SegmentedJournalReader.hasNext() 81/110681/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 12 Mar 2024 19:47:54 +0000 (20:47 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 13 Mar 2024 07:28:57 +0000 (08:28 +0100)
hasNext() is an extremely hot method and its behaviour depends on
JournalReader.Mode, which is invariant for a particular instance.

Expose the invariant to the JVM by splitting the class into the base,
which services Mode.ALL and a subclass, which performs the commit index
check.

Also move segment acquisition and reader positioning to
SegmentedJournal, as otherwise we would be calling hasNext() from
constructor, which is getting flagged by SpotBugs.

JIRA: CONTROLLER-2106
Change-Id: I29fca69a211fc8f21619a54fe59a8ff4012b01d0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java

diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java
new file mode 100644 (file)
index 0000000..8dfb14f
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+/**
+ * A {@link JournalReader} traversing only committed entries.
+ */
+final class CommitsSegmentJournalReader<E> extends SegmentedJournalReader<E> {
+    CommitsSegmentJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
+        super(journal, segment);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return getNextIndex() <= journal.getCommitIndex() && super.hasNext();
+    }
+}
index 865daeee528574d17acaf2289f3a8f3b03d6018f..7da4655a48591d1192be4be4a6f6419f0c2e9371 100644 (file)
@@ -210,7 +210,17 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @return The Raft log reader.
    */
   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
-    final var reader = new SegmentedJournalReader<>(this, index, mode);
+    final var segment = getSegment(index);
+    final var reader = switch (mode) {
+      case ALL -> new SegmentedJournalReader<>(this, segment);
+      case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
+    };
+
+    // Forward reader to specified index
+    for (long next = reader.getNextIndex(); index > next && reader.hasNext(); next = reader.getNextIndex()) {
+      reader.next();
+    }
+
     readers.add(reader);
     return reader;
   }
index 32d72176c33bdf16d049df22c34dadd4f6a22866..3022b5190324b8ba7d25076aab1e9bbdbce7a854 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import static java.util.Objects.requireNonNull;
+
 import java.util.NoSuchElementException;
 
 /**
- * Raft log reader.
+ * A {@link JournalReader} traversing all entries.
  */
-final class SegmentedJournalReader<E> implements JournalReader<E> {
-  private final SegmentedJournal<E> journal;
+sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
+  final SegmentedJournal<E> journal;
   private JournalSegment<E> currentSegment;
   private Indexed<E> previousEntry;
   private JournalSegmentReader<E> currentReader;
-  private final Mode mode;
-
-  SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
-    this.journal = journal;
-    this.mode = mode;
-    currentSegment = journal.getSegment(index);
-    currentReader = currentSegment.createReader();
 
-    long nextIndex = getNextIndex();
-    while (index > nextIndex && hasNext()) {
-      next();
-      nextIndex = getNextIndex();
-    }
+  SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
+    this.journal = requireNonNull(journal);
+    currentSegment = requireNonNull(segment);
+    currentReader = segment.createReader();
   }
 
   @Override
-  public long getFirstIndex() {
+  public final long getFirstIndex() {
     return journal.getFirstSegment().index();
   }
 
   @Override
-  public long getCurrentIndex() {
+  public final long getCurrentIndex() {
     long currentIndex = currentReader.getCurrentIndex();
     if (currentIndex != 0) {
       return currentIndex;
@@ -58,7 +52,7 @@ final class SegmentedJournalReader<E> implements JournalReader<E> {
   }
 
   @Override
-  public Indexed<E> getCurrentEntry() {
+  public final Indexed<E> getCurrentEntry() {
     Indexed<E> currentEntry = currentReader.getCurrentEntry();
     if (currentEntry != null) {
       return currentEntry;
@@ -67,12 +61,12 @@ final class SegmentedJournalReader<E> implements JournalReader<E> {
   }
 
   @Override
-  public long getNextIndex() {
+  public final long getNextIndex() {
     return currentReader.getNextIndex();
   }
 
   @Override
-  public void reset() {
+  public final void reset() {
     previousEntry = null;
     currentReader.close();
 
@@ -81,7 +75,7 @@ final class SegmentedJournalReader<E> implements JournalReader<E> {
   }
 
   @Override
-  public void reset(long index) {
+  public final void reset(long index) {
     // If the current segment is not open, it has been replaced. Reset the segments.
     if (!currentSegment.isOpen()) {
       reset();
@@ -125,24 +119,11 @@ final class SegmentedJournalReader<E> implements JournalReader<E> {
 
   @Override
   public boolean hasNext() {
-    if (mode == Mode.ALL) {
-      return hasNextEntry();
-    }
-
-    long nextIndex = getNextIndex();
-    long commitIndex = journal.getCommitIndex();
-    return nextIndex <= commitIndex && hasNextEntry();
-  }
-
-  private boolean hasNextEntry() {
-    if (currentReader.hasNext()) {
-      return true;
-    }
-    return moveToNextSegment() ? currentReader.hasNext() : false;
+    return currentReader.hasNext() || moveToNextSegment() && currentReader.hasNext();
   }
 
   @Override
-  public Indexed<E> next() {
+  public final Indexed<E> next() {
     if (currentReader.hasNext()) {
       previousEntry = currentReader.getCurrentEntry();
       return currentReader.next();
@@ -154,7 +135,7 @@ final class SegmentedJournalReader<E> implements JournalReader<E> {
   }
 
   @Override
-  public void close() {
+  public final void close() {
     currentReader.close();
     journal.closeReader(this);
   }