Do not call nextSegment() from {first,last}Segment() 52/111652/3
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 7 May 2024 21:24:33 +0000 (23:24 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 7 May 2024 22:31:08 +0000 (00:31 +0200)
We always have at last one segment and nextSegment() is the slow path
here (requiring synchronization). Just assume {first,last}Entry()
returns non-null.

This assumption is violated while we are removing a segment, as we could
be removing the last segment. In that case we need to re-create it.

Also improve method names and documentation a bit.

JIRA: CONTROLLER-2115
Change-Id: I74bb1578e73828666ee795522a68f14ad112ec75
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java

index c3e4b2bad15d847bf3c2fc528a0b6959bc5d2bd8..dc837eaabeb58a876df84e393d7bf6474e542d57 100644 (file)
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.BiFunction;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +52,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
     private final boolean flushOnCommit;
     private final @NonNull ByteBufWriter writer;
 
+    // null when closed
     private JournalSegment currentSegment;
     private volatile long commitIndex;
 
@@ -63,7 +65,13 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
         this.maxEntrySize = maxEntrySize;
         this.indexDensity = indexDensity;
         this.flushOnCommit = flushOnCommit;
-        open();
+
+        // Load existing log segments from disk.
+        for (var segment : loadSegments()) {
+            segments.put(segment.firstIndex(), segment);
+        }
+        currentSegment = ensureLastSegment();
+
         writer = new SegmentedByteBufWriter(this);
     }
 
@@ -108,23 +116,6 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
         return openReader(index, SegmentedCommitsByteBufReader::new);
     }
 
-    /**
-     * Opens the segments.
-     */
-    private synchronized void open() {
-        // Load existing log segments from disk.
-        for (var segment : loadSegments()) {
-            segments.put(segment.firstIndex(), segment);
-        }
-        // If a segment doesn't already exist, create an initial segment starting at index 1.
-        if (segments.isEmpty()) {
-            currentSegment = createSegment(1, 1);
-            segments.put(1L, currentSegment);
-        }  else {
-            currentSegment = segments.lastEntry().getValue();
-        }
-    }
-
     /**
      * Asserts that the manager is open.
      *
@@ -143,19 +134,6 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
         }
     }
 
-    /**
-     * Resets the current segment, creating a new segment if necessary.
-     */
-    private synchronized void resetCurrentSegment() {
-        final var lastSegment = lastSegment();
-        if (lastSegment == null) {
-            currentSegment = createSegment(1, 1);
-            segments.put(1L, currentSegment);
-        } else {
-            currentSegment = lastSegment;
-        }
-    }
-
     /**
      * Resets and returns the first segment in the journal.
      *
@@ -173,10 +151,9 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
 
         segments.values().forEach(JournalSegment::delete);
         segments.clear();
-
-        currentSegment = createSegment(1, index);
-        segments.put(index, currentSegment);
-        return currentSegment;
+        final var newSegment = createInitialSegment();
+        currentSegment = newSegment;
+        return newSegment;
     }
 
     /**
@@ -186,8 +163,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
      */
     JournalSegment firstSegment() {
         assertOpen();
-        final var firstEntry = segments.firstEntry();
-        return firstEntry != null ? firstEntry.getValue() : nextSegment();
+        return segments.firstEntry().getValue();
     }
 
     /**
@@ -197,8 +173,18 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
      */
     JournalSegment lastSegment() {
         assertOpen();
-        final var lastEntry = segments.lastEntry();
-        return lastEntry != null ? lastEntry.getValue() : nextSegment();
+        return segments.lastEntry().getValue();
+    }
+
+    /**
+     * Returns the segment following the segment with the given ID.
+     *
+     * @param index The segment index with which to look up the next segment.
+     * @return The next segment for the given index, or {@code null} if no such segment exists
+     */
+    @Nullable JournalSegment tryNextSegment(final long index) {
+        final var higherEntry = segments.higherEntry(index);
+        return higherEntry != null ? higherEntry.getValue() : null;
     }
 
     /**
@@ -207,26 +193,17 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
      * @return The next segment.
      * @throws IllegalStateException if the segment manager is not open
      */
-    synchronized JournalSegment nextSegment() {
+    synchronized @NonNull JournalSegment createNextSegment() {
         assertOpen();
         assertDiskSpace();
 
+        // FIXME: lastSegment should equal currentSegment. We should be asserting that.
         final var index = currentSegment.lastIndex() + 1;
         final var lastSegment = lastSegment();
-        currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
-        segments.put(index, currentSegment);
-        return currentSegment;
-    }
-
-    /**
-     * Returns the segment following the segment with the given ID.
-     *
-     * @param index The segment index with which to look up the next segment.
-     * @return The next segment for the given index.
-     */
-    JournalSegment nextSegment(final long index) {
-        final var higherEntry = segments.higherEntry(index);
-        return higherEntry != null ? higherEntry.getValue() : null;
+        final var nextSegment = createSegment(lastSegment.file().segmentId() + 1, index);
+        segments.put(index, nextSegment);
+        currentSegment = nextSegment;
+        return nextSegment;
     }
 
     /**
@@ -255,18 +232,24 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
     synchronized void removeSegment(final JournalSegment segment) {
         segments.remove(segment.firstIndex());
         segment.delete();
-        resetCurrentSegment();
+
+        // Reset current segment to last segment
+        currentSegment = ensureLastSegment();
     }
 
     /**
      * Creates a new segment.
+     *
+     * @param segmentId the segment ID
+     * @param firstIndex index of first entry
+     * @param A new segment
      */
-    JournalSegment createSegment(final long id, final long index) {
+    private @NonNull JournalSegment createSegment(final long segmentId, final long firstIndex) {
         final JournalSegmentFile file;
         try {
             file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
-                .withId(id)
-                .withIndex(index)
+                .withId(segmentId)
+                .withIndex(firstIndex)
                 .withMaxSegmentSize(maxSegmentSize)
                 // FIXME: propagate maxEntries
                 .withMaxEntries(Integer.MAX_VALUE)
@@ -281,12 +264,29 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
         return segment;
     }
 
+    private @NonNull JournalSegment createInitialSegment() {
+        final var segment = createSegment(1, 1);
+        segments.put(1L, segment);
+        return segment;
+    }
+
+    /**
+     * Make sure there is a last segment and return it.
+     *
+     * @return the last segment
+     */
+    private JournalSegment ensureLastSegment() {
+        final var lastEntry = segments.lastEntry();
+        // if there is no segment, create an initial segment starting at index 1.
+        return lastEntry != null ? lastEntry.getValue() : createInitialSegment();
+    }
+
     /**
      * Loads all segments from disk.
      *
      * @return A collection of segments for the log.
      */
-    protected Collection<JournalSegment> loadSegments() {
+    private Collection<JournalSegment> loadSegments() {
         // Ensure log directories are created.
         directory.mkdirs();
 
index d1646768452767bac92999bb65f49889bce18d9f..d7eb68e8478811c1ff81c279bb38b8a7f9933132 100644 (file)
@@ -125,7 +125,7 @@ sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCo
     ByteBuf tryAdvance(final long index) {
         var buf = currentReader.readBytes();
         if (buf == null) {
-            final var nextSegment = journal.nextSegment(currentSegment.firstIndex());
+            final var nextSegment = journal.tryNextSegment(currentSegment.firstIndex());
             if (nextSegment == null || nextSegment.firstIndex() != index) {
                 return null;
             }
index 7e92815f346b5f81ae479cb3092583618aa4c3ff..0d942dd08533535dde2ffce8d641d179dda535e7 100644 (file)
@@ -77,7 +77,7 @@ final class SegmentedByteBufWriter implements ByteBufWriter {
         //  Slow path: we do not have enough capacity
         currentWriter.flush();
         currentSegment.releaseWriter();
-        currentSegment = journal.nextSegment();
+        currentSegment = journal.createNextSegment();
         currentWriter = currentSegment.acquireWriter();
         return verifyNotNull(currentWriter.append(buf));
     }