Clean up SegmentedJournalReader 83/110583/2
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 11 Mar 2024 07:44:57 +0000 (08:44 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 11 Mar 2024 12:51:31 +0000 (13:51 +0100)
We have bit of duplicated code dealing with moving to next segment.
Concentrate it to a single method.

This shows the code actually has a bug, where it fails to close the
reader -- hence potentially leaking.

JIRA: CONTROLLER-2098
Change-Id: Ib58116d21cedde58c8f84d5fffd245e8e8408935
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java

index 3b19f4ea3b1ff1fdc5013eb4e1c42fcd1ac50626..d5201b3c26da95c6c503d3a9df3910ebba73338d 100644 (file)
@@ -20,19 +20,20 @@ import java.util.NoSuchElementException;
 /**
  * Raft log reader.
  */
-public class SegmentedJournalReader<E> implements JournalReader<E> {
+public final class SegmentedJournalReader<E> implements JournalReader<E> {
   private final SegmentedJournal<E> journal;
   private JournalSegment<E> currentSegment;
   private Indexed<E> previousEntry;
   private MappableJournalSegmentReader<E> currentReader;
   private final Mode mode;
 
-  public SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
+  SegmentedJournalReader(SegmentedJournal<E> journal, long index, Mode mode) {
     this.journal = journal;
     this.mode = mode;
     currentSegment = journal.getSegment(index);
     currentSegment.acquire();
     currentReader = currentSegment.createReader();
+
     long nextIndex = getNextIndex();
     while (index > nextIndex && hasNext()) {
       next();
@@ -73,12 +74,13 @@ public class SegmentedJournalReader<E> implements JournalReader<E> {
 
   @Override
   public void reset() {
+    previousEntry = null;
     currentReader.close();
     currentSegment.release();
+
     currentSegment = journal.getFirstSegment();
     currentSegment.acquire();
     currentReader = currentSegment.createReader();
-    previousEntry = null;
   }
 
   @Override
@@ -106,6 +108,7 @@ public class SegmentedJournalReader<E> implements JournalReader<E> {
       if (segment != null) {
         currentReader.close();
         currentSegment.release();
+
         currentSegment = segment;
         currentSegment.acquire();
         currentReader = currentSegment.createReader();
@@ -137,39 +140,22 @@ public class SegmentedJournalReader<E> implements JournalReader<E> {
   }
 
   private boolean hasNextEntry() {
-    if (!currentReader.hasNext()) {
-      JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
-      if (nextSegment != null && nextSegment.index() == getNextIndex()) {
-        previousEntry = currentReader.getCurrentEntry();
-        currentSegment.release();
-        currentSegment = nextSegment;
-        currentSegment.acquire();
-        currentReader = currentSegment.createReader();
-        return currentReader.hasNext();
-      }
-      return false;
+    if (currentReader.hasNext()) {
+      return true;
     }
-    return true;
+    return moveToNextSegment() ? currentReader.hasNext() : false;
   }
 
   @Override
   public Indexed<E> next() {
-    if (!currentReader.hasNext()) {
-      JournalSegment<E> nextSegment = journal.getNextSegment(currentSegment.index());
-      if (nextSegment != null && nextSegment.index() == getNextIndex()) {
-        previousEntry = currentReader.getCurrentEntry();
-        currentSegment.release();
-        currentSegment = nextSegment;
-        currentSegment.acquire();
-        currentReader = currentSegment.createReader();
-        return currentReader.next();
-      } else {
-        throw new NoSuchElementException();
-      }
-    } else {
+    if (currentReader.hasNext()) {
       previousEntry = currentReader.getCurrentEntry();
       return currentReader.next();
     }
+    if (moveToNextSegment()) {
+      return currentReader.next();
+    }
+    throw new NoSuchElementException();
   }
 
   @Override
@@ -177,4 +163,20 @@ public class SegmentedJournalReader<E> implements JournalReader<E> {
     currentReader.close();
     journal.closeReader(this);
   }
+
+  private boolean moveToNextSegment() {
+    final var nextSegment = journal.getNextSegment(currentSegment.index());
+    if (nextSegment == null || nextSegment.index() != getNextIndex()) {
+      return false;
+    }
+
+    previousEntry = currentReader.getCurrentEntry();
+    currentReader.close();
+    currentSegment.release();
+
+    currentSegment = nextSegment;
+    currentSegment.acquire();
+    currentReader = currentSegment.createReader();
+    return true;
+  }
 }