From 204c9c2f7dff16dc85b2a211c88347aeaa412591 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2024 23:24:33 +0200 Subject: [PATCH] Do not call nextSegment() from {first,last}Segment() We always have at last one segment and nextSegment() is the slow path here (requiring synchronization). Just assume {first,last}Entry() returns non-null. This assumption is violated while we are removing a segment, as we could be removing the last segment. In that case we need to re-create it. Also improve method names and documentation a bit. JIRA: CONTROLLER-2115 Change-Id: I74bb1578e73828666ee795522a68f14ad112ec75 Signed-off-by: Robert Varga --- .../journal/SegmentedByteBufJournal.java | 118 +++++++++--------- .../journal/SegmentedByteBufReader.java | 2 +- .../journal/SegmentedByteBufWriter.java | 2 +- 3 files changed, 61 insertions(+), 61 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java index c3e4b2bad1..dc837eaabe 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.BiFunction; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private final boolean flushOnCommit; private final @NonNull ByteBufWriter writer; + // null when closed private JournalSegment currentSegment; private volatile long commitIndex; @@ -63,7 +65,13 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { this.maxEntrySize = maxEntrySize; this.indexDensity = indexDensity; this.flushOnCommit = flushOnCommit; - open(); + + // Load existing log segments from disk. + for (var segment : loadSegments()) { + segments.put(segment.firstIndex(), segment); + } + currentSegment = ensureLastSegment(); + writer = new SegmentedByteBufWriter(this); } @@ -108,23 +116,6 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { return openReader(index, SegmentedCommitsByteBufReader::new); } - /** - * Opens the segments. - */ - private synchronized void open() { - // Load existing log segments from disk. - for (var segment : loadSegments()) { - segments.put(segment.firstIndex(), segment); - } - // If a segment doesn't already exist, create an initial segment starting at index 1. - if (segments.isEmpty()) { - currentSegment = createSegment(1, 1); - segments.put(1L, currentSegment); - } else { - currentSegment = segments.lastEntry().getValue(); - } - } - /** * Asserts that the manager is open. * @@ -143,19 +134,6 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { } } - /** - * Resets the current segment, creating a new segment if necessary. - */ - private synchronized void resetCurrentSegment() { - final var lastSegment = lastSegment(); - if (lastSegment == null) { - currentSegment = createSegment(1, 1); - segments.put(1L, currentSegment); - } else { - currentSegment = lastSegment; - } - } - /** * Resets and returns the first segment in the journal. * @@ -173,10 +151,9 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { segments.values().forEach(JournalSegment::delete); segments.clear(); - - currentSegment = createSegment(1, index); - segments.put(index, currentSegment); - return currentSegment; + final var newSegment = createInitialSegment(); + currentSegment = newSegment; + return newSegment; } /** @@ -186,8 +163,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { */ JournalSegment firstSegment() { assertOpen(); - final var firstEntry = segments.firstEntry(); - return firstEntry != null ? firstEntry.getValue() : nextSegment(); + return segments.firstEntry().getValue(); } /** @@ -197,8 +173,18 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { */ JournalSegment lastSegment() { assertOpen(); - final var lastEntry = segments.lastEntry(); - return lastEntry != null ? lastEntry.getValue() : nextSegment(); + return segments.lastEntry().getValue(); + } + + /** + * Returns the segment following the segment with the given ID. + * + * @param index The segment index with which to look up the next segment. + * @return The next segment for the given index, or {@code null} if no such segment exists + */ + @Nullable JournalSegment tryNextSegment(final long index) { + final var higherEntry = segments.higherEntry(index); + return higherEntry != null ? higherEntry.getValue() : null; } /** @@ -207,26 +193,17 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { * @return The next segment. * @throws IllegalStateException if the segment manager is not open */ - synchronized JournalSegment nextSegment() { + synchronized @NonNull JournalSegment createNextSegment() { assertOpen(); assertDiskSpace(); + // FIXME: lastSegment should equal currentSegment. We should be asserting that. final var index = currentSegment.lastIndex() + 1; final var lastSegment = lastSegment(); - currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index); - segments.put(index, currentSegment); - return currentSegment; - } - - /** - * Returns the segment following the segment with the given ID. - * - * @param index The segment index with which to look up the next segment. - * @return The next segment for the given index. - */ - JournalSegment nextSegment(final long index) { - final var higherEntry = segments.higherEntry(index); - return higherEntry != null ? higherEntry.getValue() : null; + final var nextSegment = createSegment(lastSegment.file().segmentId() + 1, index); + segments.put(index, nextSegment); + currentSegment = nextSegment; + return nextSegment; } /** @@ -255,18 +232,24 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { synchronized void removeSegment(final JournalSegment segment) { segments.remove(segment.firstIndex()); segment.delete(); - resetCurrentSegment(); + + // Reset current segment to last segment + currentSegment = ensureLastSegment(); } /** * Creates a new segment. + * + * @param segmentId the segment ID + * @param firstIndex index of first entry + * @param A new segment */ - JournalSegment createSegment(final long id, final long index) { + private @NonNull JournalSegment createSegment(final long segmentId, final long firstIndex) { final JournalSegmentFile file; try { file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder() - .withId(id) - .withIndex(index) + .withId(segmentId) + .withIndex(firstIndex) .withMaxSegmentSize(maxSegmentSize) // FIXME: propagate maxEntries .withMaxEntries(Integer.MAX_VALUE) @@ -281,12 +264,29 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { return segment; } + private @NonNull JournalSegment createInitialSegment() { + final var segment = createSegment(1, 1); + segments.put(1L, segment); + return segment; + } + + /** + * Make sure there is a last segment and return it. + * + * @return the last segment + */ + private JournalSegment ensureLastSegment() { + final var lastEntry = segments.lastEntry(); + // if there is no segment, create an initial segment starting at index 1. + return lastEntry != null ? lastEntry.getValue() : createInitialSegment(); + } + /** * Loads all segments from disk. * * @return A collection of segments for the log. */ - protected Collection loadSegments() { + private Collection loadSegments() { // Ensure log directories are created. directory.mkdirs(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java index d164676845..d7eb68e847 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java @@ -125,7 +125,7 @@ sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCo ByteBuf tryAdvance(final long index) { var buf = currentReader.readBytes(); if (buf == null) { - final var nextSegment = journal.nextSegment(currentSegment.firstIndex()); + final var nextSegment = journal.tryNextSegment(currentSegment.firstIndex()); if (nextSegment == null || nextSegment.firstIndex() != index) { return null; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java index 7e92815f34..0d942dd085 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -77,7 +77,7 @@ final class SegmentedByteBufWriter implements ByteBufWriter { // Slow path: we do not have enough capacity currentWriter.flush(); currentSegment.releaseWriter(); - currentSegment = journal.nextSegment(); + currentSegment = journal.createNextSegment(); currentWriter = currentSegment.acquireWriter(); return verifyNotNull(currentWriter.append(buf)); } -- 2.36.6