From: Robert Varga Date: Wed, 13 Mar 2024 12:32:07 +0000 (+0100) Subject: JournalReader is not an Iterator X-Git-Tag: v9.0.1~14 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=cfe137aaad17ccc691f25ce517bf83a765216b63 JournalReader is not an Iterator Iterator dictating a two method API, which is not quite appropriate, as the writer could be manipulated between hasNext() and next() methods -- and next() has to perform validation again. Introduce JournalReader.tryNext(), which returns a @Nullable Indexed -- either the next entry (as next()), or null (indicating !hasNext() case). JIRA: CONTROLLER-2106 Change-Id: Ie0338a9869ece8e6381ae719a29e97fd569b442f Signed-off-by: Robert Varga --- diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java index 374eed901b..ac80fed96b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java @@ -15,8 +15,6 @@ */ package io.atomix.storage.journal; -import java.util.NoSuchElementException; - /** * A {@link JournalReader} traversing only committed entries. */ @@ -26,19 +24,7 @@ final class CommitsSegmentJournalReader extends SegmentedJournalReader { } @Override - public boolean hasNext() { - return isNextCommited() && super.hasNext(); - } - - @Override - public Indexed next() { - if (isNextCommited()) { - return super.next(); - } - throw new NoSuchElementException(); - } - - private boolean isNextCommited() { - return getNextIndex() <= journal.getCommitIndex(); + public Indexed tryNext() { + return getNextIndex() <= journal.getCommitIndex() ? super.tryNext() : null; } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java index 730ab205b8..6df56cacc7 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java @@ -15,89 +15,75 @@ */ package io.atomix.storage.journal; -import java.util.Iterator; -import java.util.NoSuchElementException; +import org.eclipse.jdt.annotation.Nullable; /** * Log reader. * * @author Jordan Halterman */ -public interface JournalReader extends Iterator>, AutoCloseable { - - /** - * Raft log reader mode. - */ - enum Mode { - +public interface JournalReader extends AutoCloseable { /** - * Reads all entries from the log. + * Raft log reader mode. */ - ALL, + enum Mode { + /** + * Reads all entries from the log. + */ + ALL, + /** + * Reads committed entries from the log. + */ + COMMITS, + } /** - * Reads committed entries from the log. + * Returns the first index in the journal. + * + * @return the first index in the journal */ - COMMITS, - } - - /** - * Returns the first index in the journal. - * - * @return the first index in the journal - */ - long getFirstIndex(); - - /** - * Returns the current reader index. - * - * @return The current reader index. - */ - long getCurrentIndex(); + long getFirstIndex(); - /** - * Returns the last read entry. - * - * @return The last read entry. - */ - Indexed getCurrentEntry(); + /** + * Returns the current reader index. + * + * @return The current reader index. + */ + long getCurrentIndex(); - /** - * Returns the next reader index. - * - * @return The next reader index. - */ - long getNextIndex(); + /** + * Returns the last read entry. + * + * @return The last read entry. + */ + Indexed getCurrentEntry(); - /** - * Returns whether the reader has a next entry to read. - * - * @return Whether the reader has a next entry to read. - */ - @Override - boolean hasNext(); + /** + * Returns the next reader index. + * + * @return The next reader index. + */ + long getNextIndex(); - /** - * Returns the next entry in the reader. - * - * @return The next entry in the reader. - * @throws NoSuchElementException there is no next entry - */ - @Override - Indexed next(); + /** + * Try to move to the next entry. + * + * @return The next entry in the reader, or {@code null} if there is no next entry. + */ + @Nullable Indexed tryNext(); - /** - * Resets the reader to the start. - */ - void reset(); + /** + * Resets the reader to the start. + */ + void reset(); - /** - * Resets the reader to the given index. - * - * @param index The index to which to reset the reader. - */ - void reset(long index); + /** + * Resets the reader to the given index. + * + * @param index The index to which to reset the reader. + */ + void reset(long index); - @Override - void close(); + @Override + void close(); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 7da4655a48..db79e00f43 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -217,8 +217,9 @@ public final class SegmentedJournal implements Journal { }; // Forward reader to specified index - for (long next = reader.getNextIndex(); index > next && reader.hasNext(); next = reader.getNextIndex()) { - reader.next(); + long next = reader.getNextIndex(); + while (index > next && reader.tryNext() != null) { + next = reader.getNextIndex(); } readers.add(reader); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java index b77b9def43..6bd2308475 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java @@ -17,8 +17,6 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; -import java.util.NoSuchElementException; - /** * A {@link JournalReader} traversing all entries. */ @@ -112,38 +110,21 @@ sealed class SegmentedJournalReader implements JournalReader permits Commi * Fast forwards the journal to the given index. */ private void forward(long index) { - while (getNextIndex() < index && hasNext()) { - next(); + while (getNextIndex() < index && tryNext() != null) { + // Nothing else } } @Override - public boolean hasNext() { - return currentReader.hasNext() || moveToNextSegment() && currentReader.hasNext(); - } - - @Override - public Indexed next() { + public Indexed tryNext() { if (currentReader.hasNext()) { previousEntry = currentReader.getCurrentEntry(); return currentReader.next(); } - if (moveToNextSegment()) { - return currentReader.next(); - } - throw new NoSuchElementException(); - } - @Override - public final void close() { - currentReader.close(); - journal.closeReader(this); - } - - private boolean moveToNextSegment() { final var nextSegment = journal.getNextSegment(currentSegment.index()); if (nextSegment == null || nextSegment.index() != getNextIndex()) { - return false; + return null; } previousEntry = currentReader.getCurrentEntry(); @@ -151,6 +132,12 @@ sealed class SegmentedJournalReader implements JournalReader permits Commi currentSegment = nextSegment; currentReader = currentSegment.createReader(); - return true; + return currentReader.hasNext() ? currentReader.next() : null; + } + + @Override + public final void close() { + currentReader.close(); + journal.closeReader(this); } } diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java index 54f6e6d5d8..a82d9449b2 100644 --- a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java +++ b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java @@ -17,11 +17,8 @@ package io.atomix.storage.journal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.FileVisitResult; @@ -32,7 +29,6 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.List; -import java.util.NoSuchElementException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -106,79 +102,78 @@ public abstract class AbstractJournalTest { JournalReader reader = journal.openReader(1); // Append a couple entries. - Indexed indexed; assertEquals(1, writer.getNextIndex()); - indexed = writer.append(ENTRY); + var indexed = writer.append(ENTRY); assertEquals(1, indexed.index()); assertEquals(2, writer.getNextIndex()); writer.append(ENTRY); reader.reset(2); - indexed = reader.next(); + indexed = reader.tryNext(); + assertNotNull(indexed); assertEquals(2, indexed.index()); - assertFalse(reader.hasNext()); + assertNull(reader.tryNext()); // Test reading an entry - Indexed entry1; reader.reset(); - entry1 = reader.next(); + var entry1 = reader.tryNext(); + assertNotNull(entry1); assertEquals(1, entry1.index()); assertEquals(entry1, reader.getCurrentEntry()); assertEquals(1, reader.getCurrentIndex()); // Test reading a second entry - Indexed entry2; - assertTrue(reader.hasNext()); assertEquals(2, reader.getNextIndex()); - entry2 = reader.next(); + var entry2 = reader.tryNext(); + assertNotNull(entry2); assertEquals(2, entry2.index()); assertEquals(entry2, reader.getCurrentEntry()); assertEquals(2, reader.getCurrentIndex()); - assertFalse(reader.hasNext()); + assertEquals(3, reader.getNextIndex()); + assertNull(reader.tryNext()); // Test opening a new reader and reading from the journal. reader = journal.openReader(1); - assertTrue(reader.hasNext()); - entry1 = reader.next(); + entry1 = reader.tryNext(); + assertNotNull(entry1); assertEquals(1, entry1.index()); assertEquals(entry1, reader.getCurrentEntry()); assertEquals(1, reader.getCurrentIndex()); - assertTrue(reader.hasNext()); - assertTrue(reader.hasNext()); assertEquals(2, reader.getNextIndex()); - entry2 = reader.next(); + entry2 = reader.tryNext(); + assertNotNull(entry2); assertEquals(2, entry2.index()); assertEquals(entry2, reader.getCurrentEntry()); assertEquals(2, reader.getCurrentIndex()); - assertFalse(reader.hasNext()); + assertNull(reader.tryNext()); // Reset the reader. reader.reset(); // Test opening a new reader and reading from the journal. reader = journal.openReader(1); - assertTrue(reader.hasNext()); - entry1 = reader.next(); + entry1 = reader.tryNext(); + assertNotNull(entry1); assertEquals(1, entry1.index()); assertEquals(entry1, reader.getCurrentEntry()); assertEquals(1, reader.getCurrentIndex()); - assertTrue(reader.hasNext()); - assertTrue(reader.hasNext()); assertEquals(2, reader.getNextIndex()); - entry2 = reader.next(); + entry2 = reader.tryNext(); + assertNotNull(entry2); assertEquals(2, entry2.index()); assertEquals(entry2, reader.getCurrentEntry()); assertEquals(2, reader.getCurrentIndex()); - assertFalse(reader.hasNext()); + assertNull(reader.tryNext()); // Truncate the journal and write a different entry. writer.truncate(1); assertEquals(2, writer.getNextIndex()); writer.append(ENTRY); reader.reset(2); - indexed = reader.next(); + indexed = reader.tryNext(); + assertNotNull(indexed); assertEquals(2, indexed.index()); // Reset the reader to a specific index and read the last entry again. @@ -187,13 +182,13 @@ public abstract class AbstractJournalTest { assertNotNull(reader.getCurrentEntry()); assertEquals(1, reader.getCurrentIndex()); assertEquals(1, reader.getCurrentEntry().index()); - assertTrue(reader.hasNext()); assertEquals(2, reader.getNextIndex()); - entry2 = reader.next(); + entry2 = reader.tryNext(); + assertNotNull(entry2); assertEquals(2, entry2.index()); assertEquals(entry2, reader.getCurrentEntry()); assertEquals(2, reader.getCurrentIndex()); - assertFalse(reader.hasNext()); + assertNull(reader.tryNext()); } } @@ -209,15 +204,19 @@ public abstract class AbstractJournalTest { writer.reset(1); assertEquals(0, writer.getLastIndex()); writer.append(ENTRY); - assertEquals(1, reader.next().index()); + + var indexed = reader.tryNext(); + assertNotNull(indexed); + assertEquals(1, indexed.index()); writer.reset(1); assertEquals(0, writer.getLastIndex()); writer.append(ENTRY); assertEquals(1, writer.getLastIndex()); assertEquals(1, writer.getLastEntry().index()); - assertTrue(reader.hasNext()); - assertEquals(1, reader.next().index()); + indexed = reader.tryNext(); + assertNotNull(indexed); + assertEquals(1, indexed.index()); writer.truncate(0); assertEquals(0, writer.getLastIndex()); @@ -226,8 +225,9 @@ public abstract class AbstractJournalTest { assertEquals(1, writer.getLastIndex()); assertEquals(1, writer.getLastEntry().index()); - assertTrue(reader.hasNext()); - assertEquals(1, reader.next().index()); + indexed = reader.tryNext(); + assertNotNull(indexed); + assertEquals(1, indexed.index()); } } @@ -243,21 +243,22 @@ public abstract class AbstractJournalTest { } for (int j = 1; j <= i - 2; j++) { - assertTrue(reader.hasNext()); - assertEquals(j, reader.next().index()); + final var indexed = reader.tryNext(); + assertNotNull(indexed); + assertEquals(j, indexed.index()); } writer.truncate(i - 2); - assertFalse(reader.hasNext()); + assertNull(reader.tryNext()); assertEquals(i - 1, writer.append(new TestEntry(32)).index()); assertEquals(i, writer.append(new TestEntry(32)).index()); - assertTrue(reader.hasNext()); - Indexed entry = reader.next(); + Indexed entry = reader.tryNext(); + assertNotNull(entry); assertEquals(i - 1, entry.index()); - assertTrue(reader.hasNext()); - entry = reader.next(); + entry = reader.tryNext(); + assertNotNull(entry); assertEquals(i, entry.index()); } } @@ -270,13 +271,13 @@ public abstract class AbstractJournalTest { for (int i = 1; i <= entriesPerSegment * 5; i++) { writer.append(ENTRY); - assertTrue(reader.hasNext()); - Indexed entry; - entry = reader.next(); + var entry = reader.tryNext(); + assertNotNull(entry); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); reader.reset(i); - entry = reader.next(); + entry = reader.tryNext(); + assertNotNull(entry); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); @@ -292,10 +293,10 @@ public abstract class AbstractJournalTest { writer.truncate(i - 1); writer.append(ENTRY); - assertTrue(reader.hasNext()); + assertNotNull(reader.tryNext()); reader.reset(i); - assertTrue(reader.hasNext()); - entry = reader.next(); + entry = reader.tryNext(); + assertNotNull(entry); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); } @@ -310,37 +311,15 @@ public abstract class AbstractJournalTest { for (int i = 1; i <= entriesPerSegment * 5; i++) { writer.append(ENTRY); - assertFalse(reader.hasNext()); - writer.commit(i); - assertTrue(reader.hasNext()); - Indexed entry; - entry = reader.next(); - assertEquals(i, entry.index()); - assertEquals(32, entry.entry().bytes().length); - reader.reset(i); - entry = reader.next(); - assertEquals(i, entry.index()); - assertEquals(32, entry.entry().bytes().length); - } - } - } - - // Same as testWriteReadCommittedEntries(), but does not use hasNext() but checks whether an exception is thrown - @Test - public void testWriteReadCommittedEntriesException() throws Exception { - try (Journal journal = createJournal()) { - JournalWriter writer = journal.writer(); - JournalReader reader = journal.openReader(1, JournalReader.Mode.COMMITS); - - for (int i = 1; i <= entriesPerSegment * 5; i++) { - writer.append(ENTRY); - assertThrows(NoSuchElementException.class, reader::next); + assertNull(reader.tryNext()); writer.commit(i); - Indexed entry = reader.next(); + var entry = reader.tryNext(); + assertNotNull(entry); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); reader.reset(i); - entry = reader.next(); + entry = reader.tryNext(); + assertNotNull(entry); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); } @@ -359,33 +338,46 @@ public abstract class AbstractJournalTest { } assertEquals(1, uncommittedReader.getNextIndex()); - assertTrue(uncommittedReader.hasNext()); assertEquals(1, committedReader.getNextIndex()); - assertFalse(committedReader.hasNext()); + + // This creates asymmetry, as uncommitted reader will move one step ahead... + assertNotNull(uncommittedReader.tryNext()); + assertEquals(2, uncommittedReader.getNextIndex()); + assertNull(committedReader.tryNext()); + assertEquals(1, committedReader.getNextIndex()); writer.commit(entriesPerSegment * 9); - assertTrue(uncommittedReader.hasNext()); - assertTrue(committedReader.hasNext()); + // ... so here we catch up ... + assertNotNull(committedReader.tryNext()); + assertEquals(2, committedReader.getNextIndex()); + + // ... and continue from the second entry + for (int i = 2; i <= entriesPerSegment * 2.5; i++) { + var entry = uncommittedReader.tryNext(); + assertNotNull(entry); + assertEquals(i, entry.index()); - for (int i = 1; i <= entriesPerSegment * 2.5; i++) { - assertEquals(i, uncommittedReader.next().index()); - assertEquals(i, committedReader.next().index()); + entry = committedReader.tryNext(); + assertNotNull(entry); + assertEquals(i, entry.index()); } journal.compact(entriesPerSegment * 5 + 1); assertNull(uncommittedReader.getCurrentEntry()); assertEquals(0, uncommittedReader.getCurrentIndex()); - assertTrue(uncommittedReader.hasNext()); assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex()); - assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index()); + var entry = uncommittedReader.tryNext(); + assertNotNull(entry); + assertEquals(entriesPerSegment * 5 + 1, entry.index()); assertNull(committedReader.getCurrentEntry()); assertEquals(0, committedReader.getCurrentIndex()); - assertTrue(committedReader.hasNext()); assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex()); - assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index()); + entry = committedReader.tryNext(); + assertNotNull(entry); + assertEquals(entriesPerSegment * 5 + 1, entry.index()); } } @@ -417,10 +409,10 @@ public abstract class AbstractJournalTest { // Ensure the reader starts at the first physical index in the journal. assertEquals(entriesPerSegment + 1, reader.getNextIndex()); assertEquals(reader.getFirstIndex(), reader.getNextIndex()); - assertTrue(reader.hasNext()); - assertEquals(entriesPerSegment + 1, reader.getNextIndex()); - assertEquals(reader.getFirstIndex(), reader.getNextIndex()); - assertEquals(entriesPerSegment + 1, reader.next().index()); + final var indexed = reader.tryNext(); + assertNotNull(indexed); + assertEquals(entriesPerSegment + 1, indexed.index()); + assertEquals(entriesPerSegment + 2, reader.getNextIndex()); } } diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java index 4351fd40ac..24e8fec03a 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -82,9 +82,9 @@ final class DataJournalV0 extends DataJournal { private void handleReplayMessages(final JournalReader reader, final ReplayMessages message) { int count = 0; - while (reader.hasNext() && count < message.max) { - final var next = reader.next(); - if (next.index() > message.toSequenceNr) { + while (count < message.max) { + final var next = reader.tryNext(); + if (next == null || next.index() > message.toSequenceNr) { break; }