JournalReader is not an Iterator 94/110694/2
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 13 Mar 2024 12:32:07 +0000 (13:32 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 13 Mar 2024 13:06:56 +0000 (14:06 +0100)
Iterator dictating a two method API, which is not quite appropriate, as
the writer could be manipulated between hasNext() and next() methods --
and next() has to perform validation again.

Introduce JournalReader.tryNext(), which returns a @Nullable Indexed --
either the next entry (as next()), or null (indicating !hasNext() case).

JIRA: CONTROLLER-2106
Change-Id: Ie0338a9869ece8e6381ae719a29e97fd569b442f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java
atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java

index 374eed901b1d40b6119f433a6c33029613caa9b9..ac80fed96b75f44a7abb21d36275a5d371eef3b3 100644 (file)
@@ -15,8 +15,6 @@
  */
 package io.atomix.storage.journal;
 
-import java.util.NoSuchElementException;
-
 /**
  * A {@link JournalReader} traversing only committed entries.
  */
@@ -26,19 +24,7 @@ final class CommitsSegmentJournalReader<E> extends SegmentedJournalReader<E> {
     }
 
     @Override
-    public boolean hasNext() {
-        return isNextCommited() && super.hasNext();
-    }
-
-    @Override
-    public Indexed<E> next() {
-        if (isNextCommited()) {
-            return super.next();
-        }
-        throw new NoSuchElementException();
-    }
-
-    private boolean isNextCommited() {
-        return getNextIndex() <= journal.getCommitIndex();
+    public Indexed<E> tryNext() {
+        return getNextIndex() <= journal.getCommitIndex() ? super.tryNext() : null;
     }
 }
index 730ab205b852d80fb24af3878219485733dd82d0..6df56cacc7dac47d7c61babc52d65c8174c01d75 100644 (file)
  */
 package io.atomix.storage.journal;
 
-import java.util.Iterator;
-import java.util.NoSuchElementException;
+import org.eclipse.jdt.annotation.Nullable;
 
 /**
  * Log reader.
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-public interface JournalReader<E> extends Iterator<Indexed<E>>, AutoCloseable {
-
-  /**
-   * Raft log reader mode.
-   */
-  enum Mode {
-
+public interface JournalReader<E> extends AutoCloseable {
     /**
-     * Reads all entries from the log.
+     * Raft log reader mode.
      */
-    ALL,
+    enum Mode {
+        /**
+         * Reads all entries from the log.
+         */
+        ALL,
+        /**
+         * Reads committed entries from the log.
+         */
+        COMMITS,
+    }
 
     /**
-     * Reads committed entries from the log.
+     * Returns the first index in the journal.
+     *
+     * @return the first index in the journal
      */
-    COMMITS,
-  }
-
-  /**
-   * Returns the first index in the journal.
-   *
-   * @return the first index in the journal
-   */
-  long getFirstIndex();
-
-  /**
-   * Returns the current reader index.
-   *
-   * @return The current reader index.
-   */
-  long getCurrentIndex();
+    long getFirstIndex();
 
-  /**
-   * Returns the last read entry.
-   *
-   * @return The last read entry.
-   */
-  Indexed<E> getCurrentEntry();
+    /**
+     * Returns the current reader index.
+     *
+     * @return The current reader index.
+     */
+    long getCurrentIndex();
 
-  /**
-   * Returns the next reader index.
-   *
-   * @return The next reader index.
-   */
-  long getNextIndex();
+    /**
+     * Returns the last read entry.
+     *
+     * @return The last read entry.
+     */
+    Indexed<E> getCurrentEntry();
 
-  /**
-   * Returns whether the reader has a next entry to read.
-   *
-   * @return Whether the reader has a next entry to read.
-   */
-  @Override
-  boolean hasNext();
+    /**
+     * Returns the next reader index.
+     *
+     * @return The next reader index.
+     */
+    long getNextIndex();
 
-  /**
-   * Returns the next entry in the reader.
-   *
-   * @return The next entry in the reader.
-   * @throws NoSuchElementException there is no next entry
-   */
-  @Override
-  Indexed<E> next();
+    /**
+     * Try to move to the next entry.
+     *
+     * @return The next entry in the reader, or {@code null} if there is no next entry.
+     */
+    @Nullable Indexed<E> tryNext();
 
-  /**
-   * Resets the reader to the start.
-   */
-  void reset();
+    /**
+     * Resets the reader to the start.
+     */
+    void reset();
 
-  /**
-   * Resets the reader to the given index.
-   *
-   * @param index The index to which to reset the reader.
-   */
-  void reset(long index);
+    /**
+     * Resets the reader to the given index.
+     *
+     * @param index The index to which to reset the reader.
+     */
+    void reset(long index);
 
-  @Override
-  void close();
+    @Override
+    void close();
 }
index 7da4655a48591d1192be4be4a6f6419f0c2e9371..db79e00f43a688729193ab6f85b049d1f9b4064c 100644 (file)
@@ -217,8 +217,9 @@ public final class SegmentedJournal<E> implements Journal<E> {
     };
 
     // Forward reader to specified index
-    for (long next = reader.getNextIndex(); index > next && reader.hasNext(); next = reader.getNextIndex()) {
-      reader.next();
+    long next = reader.getNextIndex();
+    while (index > next && reader.tryNext() != null) {
+        next = reader.getNextIndex();
     }
 
     readers.add(reader);
index b77b9def430ceb5a8d6ff37a98c3b8fa3940cd82..6bd230847511c05403c606801fc88c97391dbd17 100644 (file)
@@ -17,8 +17,6 @@ package io.atomix.storage.journal;
 
 import static java.util.Objects.requireNonNull;
 
-import java.util.NoSuchElementException;
-
 /**
  * A {@link JournalReader} traversing all entries.
  */
@@ -112,38 +110,21 @@ sealed class SegmentedJournalReader<E> implements JournalReader<E> permits Commi
    * Fast forwards the journal to the given index.
    */
   private void forward(long index) {
-    while (getNextIndex() < index && hasNext()) {
-      next();
+    while (getNextIndex() < index && tryNext() != null) {
+      // Nothing else
     }
   }
 
   @Override
-  public boolean hasNext() {
-    return currentReader.hasNext() || moveToNextSegment() && currentReader.hasNext();
-  }
-
-  @Override
-  public Indexed<E> next() {
+  public Indexed<E> tryNext() {
     if (currentReader.hasNext()) {
       previousEntry = currentReader.getCurrentEntry();
       return currentReader.next();
     }
-    if (moveToNextSegment()) {
-      return currentReader.next();
-    }
-    throw new NoSuchElementException();
-  }
 
-  @Override
-  public final void close() {
-    currentReader.close();
-    journal.closeReader(this);
-  }
-
-  private boolean moveToNextSegment() {
     final var nextSegment = journal.getNextSegment(currentSegment.index());
     if (nextSegment == null || nextSegment.index() != getNextIndex()) {
-      return false;
+      return null;
     }
 
     previousEntry = currentReader.getCurrentEntry();
@@ -151,6 +132,12 @@ sealed class SegmentedJournalReader<E> implements JournalReader<E> permits Commi
 
     currentSegment = nextSegment;
     currentReader = currentSegment.createReader();
-    return true;
+    return currentReader.hasNext() ? currentReader.next() : null;
+  }
+
+  @Override
+  public final void close() {
+    currentReader.close();
+    journal.closeReader(this);
   }
 }
index 54f6e6d5d80765e04d4a383134ce223138c75898..a82d9449b2f6eaebcd71ba76229da598e44c7b85 100644 (file)
 package io.atomix.storage.journal;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.file.FileVisitResult;
@@ -32,7 +29,6 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.NoSuchElementException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -106,79 +102,78 @@ public abstract class AbstractJournalTest {
             JournalReader<TestEntry> reader = journal.openReader(1);
 
             // Append a couple entries.
-            Indexed<TestEntry> indexed;
             assertEquals(1, writer.getNextIndex());
-            indexed = writer.append(ENTRY);
+            var indexed = writer.append(ENTRY);
             assertEquals(1, indexed.index());
 
             assertEquals(2, writer.getNextIndex());
             writer.append(ENTRY);
             reader.reset(2);
-            indexed = reader.next();
+            indexed = reader.tryNext();
+            assertNotNull(indexed);
             assertEquals(2, indexed.index());
-            assertFalse(reader.hasNext());
+            assertNull(reader.tryNext());
 
             // Test reading an entry
-            Indexed<TestEntry> entry1;
             reader.reset();
-            entry1 = reader.next();
+            var entry1 = reader.tryNext();
+            assertNotNull(entry1);
             assertEquals(1, entry1.index());
             assertEquals(entry1, reader.getCurrentEntry());
             assertEquals(1, reader.getCurrentIndex());
 
             // Test reading a second entry
-            Indexed<TestEntry> entry2;
-            assertTrue(reader.hasNext());
             assertEquals(2, reader.getNextIndex());
-            entry2 = reader.next();
+            var entry2 = reader.tryNext();
+            assertNotNull(entry2);
             assertEquals(2, entry2.index());
             assertEquals(entry2, reader.getCurrentEntry());
             assertEquals(2, reader.getCurrentIndex());
-            assertFalse(reader.hasNext());
+            assertEquals(3, reader.getNextIndex());
+            assertNull(reader.tryNext());
 
             // Test opening a new reader and reading from the journal.
             reader = journal.openReader(1);
-            assertTrue(reader.hasNext());
-            entry1 = reader.next();
+            entry1 = reader.tryNext();
+            assertNotNull(entry1);
             assertEquals(1, entry1.index());
             assertEquals(entry1, reader.getCurrentEntry());
             assertEquals(1, reader.getCurrentIndex());
-            assertTrue(reader.hasNext());
 
-            assertTrue(reader.hasNext());
             assertEquals(2, reader.getNextIndex());
-            entry2 = reader.next();
+            entry2 = reader.tryNext();
+            assertNotNull(entry2);
             assertEquals(2, entry2.index());
             assertEquals(entry2, reader.getCurrentEntry());
             assertEquals(2, reader.getCurrentIndex());
-            assertFalse(reader.hasNext());
+            assertNull(reader.tryNext());
 
             // Reset the reader.
             reader.reset();
 
             // Test opening a new reader and reading from the journal.
             reader = journal.openReader(1);
-            assertTrue(reader.hasNext());
-            entry1 = reader.next();
+            entry1 = reader.tryNext();
+            assertNotNull(entry1);
             assertEquals(1, entry1.index());
             assertEquals(entry1, reader.getCurrentEntry());
             assertEquals(1, reader.getCurrentIndex());
-            assertTrue(reader.hasNext());
 
-            assertTrue(reader.hasNext());
             assertEquals(2, reader.getNextIndex());
-            entry2 = reader.next();
+            entry2 = reader.tryNext();
+            assertNotNull(entry2);
             assertEquals(2, entry2.index());
             assertEquals(entry2, reader.getCurrentEntry());
             assertEquals(2, reader.getCurrentIndex());
-            assertFalse(reader.hasNext());
+            assertNull(reader.tryNext());
 
             // Truncate the journal and write a different entry.
             writer.truncate(1);
             assertEquals(2, writer.getNextIndex());
             writer.append(ENTRY);
             reader.reset(2);
-            indexed = reader.next();
+            indexed = reader.tryNext();
+            assertNotNull(indexed);
             assertEquals(2, indexed.index());
 
             // Reset the reader to a specific index and read the last entry again.
@@ -187,13 +182,13 @@ public abstract class AbstractJournalTest {
             assertNotNull(reader.getCurrentEntry());
             assertEquals(1, reader.getCurrentIndex());
             assertEquals(1, reader.getCurrentEntry().index());
-            assertTrue(reader.hasNext());
             assertEquals(2, reader.getNextIndex());
-            entry2 = reader.next();
+            entry2 = reader.tryNext();
+            assertNotNull(entry2);
             assertEquals(2, entry2.index());
             assertEquals(entry2, reader.getCurrentEntry());
             assertEquals(2, reader.getCurrentIndex());
-            assertFalse(reader.hasNext());
+            assertNull(reader.tryNext());
         }
     }
 
@@ -209,15 +204,19 @@ public abstract class AbstractJournalTest {
             writer.reset(1);
             assertEquals(0, writer.getLastIndex());
             writer.append(ENTRY);
-            assertEquals(1, reader.next().index());
+
+            var indexed = reader.tryNext();
+            assertNotNull(indexed);
+            assertEquals(1, indexed.index());
             writer.reset(1);
             assertEquals(0, writer.getLastIndex());
             writer.append(ENTRY);
             assertEquals(1, writer.getLastIndex());
             assertEquals(1, writer.getLastEntry().index());
 
-            assertTrue(reader.hasNext());
-            assertEquals(1, reader.next().index());
+            indexed = reader.tryNext();
+            assertNotNull(indexed);
+            assertEquals(1, indexed.index());
 
             writer.truncate(0);
             assertEquals(0, writer.getLastIndex());
@@ -226,8 +225,9 @@ public abstract class AbstractJournalTest {
             assertEquals(1, writer.getLastIndex());
             assertEquals(1, writer.getLastEntry().index());
 
-            assertTrue(reader.hasNext());
-            assertEquals(1, reader.next().index());
+            indexed = reader.tryNext();
+            assertNotNull(indexed);
+            assertEquals(1, indexed.index());
         }
     }
 
@@ -243,21 +243,22 @@ public abstract class AbstractJournalTest {
             }
 
             for (int j = 1; j <= i - 2; j++) {
-                assertTrue(reader.hasNext());
-                assertEquals(j, reader.next().index());
+                final var indexed = reader.tryNext();
+                assertNotNull(indexed);
+                assertEquals(j, indexed.index());
             }
 
             writer.truncate(i - 2);
 
-            assertFalse(reader.hasNext());
+            assertNull(reader.tryNext());
             assertEquals(i - 1, writer.append(new TestEntry(32)).index());
             assertEquals(i, writer.append(new TestEntry(32)).index());
 
-            assertTrue(reader.hasNext());
-            Indexed<TestEntry> entry = reader.next();
+            Indexed<TestEntry> entry = reader.tryNext();
+            assertNotNull(entry);
             assertEquals(i - 1, entry.index());
-            assertTrue(reader.hasNext());
-            entry = reader.next();
+            entry = reader.tryNext();
+            assertNotNull(entry);
             assertEquals(i, entry.index());
         }
     }
@@ -270,13 +271,13 @@ public abstract class AbstractJournalTest {
 
             for (int i = 1; i <= entriesPerSegment * 5; i++) {
                 writer.append(ENTRY);
-                assertTrue(reader.hasNext());
-                Indexed<TestEntry> entry;
-                entry = reader.next();
+                var entry = reader.tryNext();
+                assertNotNull(entry);
                 assertEquals(i, entry.index());
                 assertEquals(32, entry.entry().bytes().length);
                 reader.reset(i);
-                entry = reader.next();
+                entry = reader.tryNext();
+                assertNotNull(entry);
                 assertEquals(i, entry.index());
                 assertEquals(32, entry.entry().bytes().length);
 
@@ -292,10 +293,10 @@ public abstract class AbstractJournalTest {
                 writer.truncate(i - 1);
                 writer.append(ENTRY);
 
-                assertTrue(reader.hasNext());
+                assertNotNull(reader.tryNext());
                 reader.reset(i);
-                assertTrue(reader.hasNext());
-                entry = reader.next();
+                entry = reader.tryNext();
+                assertNotNull(entry);
                 assertEquals(i, entry.index());
                 assertEquals(32, entry.entry().bytes().length);
             }
@@ -310,37 +311,15 @@ public abstract class AbstractJournalTest {
 
             for (int i = 1; i <= entriesPerSegment * 5; i++) {
                 writer.append(ENTRY);
-                assertFalse(reader.hasNext());
-                writer.commit(i);
-                assertTrue(reader.hasNext());
-                Indexed<TestEntry> entry;
-                entry = reader.next();
-                assertEquals(i, entry.index());
-                assertEquals(32, entry.entry().bytes().length);
-                reader.reset(i);
-                entry = reader.next();
-                assertEquals(i, entry.index());
-                assertEquals(32, entry.entry().bytes().length);
-            }
-        }
-    }
-
-    // Same as testWriteReadCommittedEntries(), but does not use hasNext() but checks whether an exception is thrown
-    @Test
-    public void testWriteReadCommittedEntriesException() throws Exception {
-        try (Journal<TestEntry> journal = createJournal()) {
-            JournalWriter<TestEntry> writer = journal.writer();
-            JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
-
-            for (int i = 1; i <= entriesPerSegment * 5; i++) {
-                writer.append(ENTRY);
-                assertThrows(NoSuchElementException.class, reader::next);
+                assertNull(reader.tryNext());
                 writer.commit(i);
-                Indexed<TestEntry> entry = reader.next();
+                var entry = reader.tryNext();
+                assertNotNull(entry);
                 assertEquals(i, entry.index());
                 assertEquals(32, entry.entry().bytes().length);
                 reader.reset(i);
-                entry = reader.next();
+                entry = reader.tryNext();
+                assertNotNull(entry);
                 assertEquals(i, entry.index());
                 assertEquals(32, entry.entry().bytes().length);
             }
@@ -359,33 +338,46 @@ public abstract class AbstractJournalTest {
             }
 
             assertEquals(1, uncommittedReader.getNextIndex());
-            assertTrue(uncommittedReader.hasNext());
             assertEquals(1, committedReader.getNextIndex());
-            assertFalse(committedReader.hasNext());
+
+            // This creates asymmetry, as uncommitted reader will move one step ahead...
+            assertNotNull(uncommittedReader.tryNext());
+            assertEquals(2, uncommittedReader.getNextIndex());
+            assertNull(committedReader.tryNext());
+            assertEquals(1, committedReader.getNextIndex());
 
             writer.commit(entriesPerSegment * 9);
 
-            assertTrue(uncommittedReader.hasNext());
-            assertTrue(committedReader.hasNext());
+            // ... so here we catch up ...
+            assertNotNull(committedReader.tryNext());
+            assertEquals(2, committedReader.getNextIndex());
+
+            // ... and continue from the second entry
+            for (int i = 2; i <= entriesPerSegment * 2.5; i++) {
+                var entry = uncommittedReader.tryNext();
+                assertNotNull(entry);
+                assertEquals(i, entry.index());
 
-            for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
-                assertEquals(i, uncommittedReader.next().index());
-                assertEquals(i, committedReader.next().index());
+                entry = committedReader.tryNext();
+                assertNotNull(entry);
+                assertEquals(i, entry.index());
             }
 
             journal.compact(entriesPerSegment * 5 + 1);
 
             assertNull(uncommittedReader.getCurrentEntry());
             assertEquals(0, uncommittedReader.getCurrentIndex());
-            assertTrue(uncommittedReader.hasNext());
             assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
-            assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
+            var entry = uncommittedReader.tryNext();
+            assertNotNull(entry);
+            assertEquals(entriesPerSegment * 5 + 1, entry.index());
 
             assertNull(committedReader.getCurrentEntry());
             assertEquals(0, committedReader.getCurrentIndex());
-            assertTrue(committedReader.hasNext());
             assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
-            assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
+            entry = committedReader.tryNext();
+            assertNotNull(entry);
+            assertEquals(entriesPerSegment * 5 + 1, entry.index());
         }
     }
 
@@ -417,10 +409,10 @@ public abstract class AbstractJournalTest {
             // Ensure the reader starts at the first physical index in the journal.
             assertEquals(entriesPerSegment + 1, reader.getNextIndex());
             assertEquals(reader.getFirstIndex(), reader.getNextIndex());
-            assertTrue(reader.hasNext());
-            assertEquals(entriesPerSegment + 1, reader.getNextIndex());
-            assertEquals(reader.getFirstIndex(), reader.getNextIndex());
-            assertEquals(entriesPerSegment + 1, reader.next().index());
+            final var indexed = reader.tryNext();
+            assertNotNull(indexed);
+            assertEquals(entriesPerSegment + 1, indexed.index());
+            assertEquals(entriesPerSegment + 2, reader.getNextIndex());
         }
     }
 
index 4351fd40ac8d49813d30043b438340db920abd7c..24e8fec03aa6168e378a3b26ec136f452aea0dfa 100644 (file)
@@ -82,9 +82,9 @@ final class DataJournalV0 extends DataJournal {
 
     private void handleReplayMessages(final JournalReader<DataJournalEntry> reader, final ReplayMessages message) {
         int count = 0;
-        while (reader.hasNext() && count < message.max) {
-            final var next = reader.next();
-            if (next.index() > message.toSequenceNr) {
+        while (count < message.max) {
+            final var next = reader.tryNext();
+            if (next == null || next.index() > message.toSequenceNr) {
                 break;
             }