From: Robert Varga Date: Tue, 12 Mar 2024 19:47:54 +0000 (+0100) Subject: Optimize SegmentedJournalReader.hasNext() X-Git-Tag: v8.0.5~19 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=1d40ac59713467ba7588cad370136ee77f808ba4;p=controller.git Optimize SegmentedJournalReader.hasNext() hasNext() is an extremely hot method and its behaviour depends on JournalReader.Mode, which is invariant for a particular instance. Expose the invariant to the JVM by splitting the class into the base, which services Mode.ALL and a subclass, which performs the commit index check. Also move segment acquisition and reader positioning to SegmentedJournal, as otherwise we would be calling hasNext() from constructor, which is getting flagged by SpotBugs. JIRA: CONTROLLER-2106 Change-Id: I29fca69a211fc8f21619a54fe59a8ff4012b01d0 Signed-off-by: Robert Varga (cherry picked from commit 87cf9d356ccaf6153ddbc0a763155c6787dc2932) --- diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java new file mode 100644 index 0000000000..8dfb14fa86 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +/** + * A {@link JournalReader} traversing only committed entries. + */ +final class CommitsSegmentJournalReader extends SegmentedJournalReader { + CommitsSegmentJournalReader(SegmentedJournal journal, JournalSegment segment) { + super(journal, segment); + } + + @Override + public boolean hasNext() { + return getNextIndex() <= journal.getCommitIndex() && super.hasNext(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 865daeee52..7da4655a48 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -210,7 +210,17 @@ public final class SegmentedJournal implements Journal { * @return The Raft log reader. */ public JournalReader openReader(long index, JournalReader.Mode mode) { - final var reader = new SegmentedJournalReader<>(this, index, mode); + final var segment = getSegment(index); + final var reader = switch (mode) { + case ALL -> new SegmentedJournalReader<>(this, segment); + case COMMITS -> new CommitsSegmentJournalReader<>(this, segment); + }; + + // Forward reader to specified index + for (long next = reader.getNextIndex(); index > next && reader.hasNext(); next = reader.getNextIndex()) { + reader.next(); + } + readers.add(reader); return reader; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java index 32d72176c3..3022b51903 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java @@ -15,38 +15,32 @@ */ package io.atomix.storage.journal; +import static java.util.Objects.requireNonNull; + import java.util.NoSuchElementException; /** - * Raft log reader. + * A {@link JournalReader} traversing all entries. */ -final class SegmentedJournalReader implements JournalReader { - private final SegmentedJournal journal; +sealed class SegmentedJournalReader implements JournalReader permits CommitsSegmentJournalReader { + final SegmentedJournal journal; private JournalSegment currentSegment; private Indexed previousEntry; private JournalSegmentReader currentReader; - private final Mode mode; - - SegmentedJournalReader(SegmentedJournal journal, long index, Mode mode) { - this.journal = journal; - this.mode = mode; - currentSegment = journal.getSegment(index); - currentReader = currentSegment.createReader(); - long nextIndex = getNextIndex(); - while (index > nextIndex && hasNext()) { - next(); - nextIndex = getNextIndex(); - } + SegmentedJournalReader(SegmentedJournal journal, JournalSegment segment) { + this.journal = requireNonNull(journal); + currentSegment = requireNonNull(segment); + currentReader = segment.createReader(); } @Override - public long getFirstIndex() { + public final long getFirstIndex() { return journal.getFirstSegment().index(); } @Override - public long getCurrentIndex() { + public final long getCurrentIndex() { long currentIndex = currentReader.getCurrentIndex(); if (currentIndex != 0) { return currentIndex; @@ -58,7 +52,7 @@ final class SegmentedJournalReader implements JournalReader { } @Override - public Indexed getCurrentEntry() { + public final Indexed getCurrentEntry() { Indexed currentEntry = currentReader.getCurrentEntry(); if (currentEntry != null) { return currentEntry; @@ -67,12 +61,12 @@ final class SegmentedJournalReader implements JournalReader { } @Override - public long getNextIndex() { + public final long getNextIndex() { return currentReader.getNextIndex(); } @Override - public void reset() { + public final void reset() { previousEntry = null; currentReader.close(); @@ -81,7 +75,7 @@ final class SegmentedJournalReader implements JournalReader { } @Override - public void reset(long index) { + public final void reset(long index) { // If the current segment is not open, it has been replaced. Reset the segments. if (!currentSegment.isOpen()) { reset(); @@ -125,24 +119,11 @@ final class SegmentedJournalReader implements JournalReader { @Override public boolean hasNext() { - if (mode == Mode.ALL) { - return hasNextEntry(); - } - - long nextIndex = getNextIndex(); - long commitIndex = journal.getCommitIndex(); - return nextIndex <= commitIndex && hasNextEntry(); - } - - private boolean hasNextEntry() { - if (currentReader.hasNext()) { - return true; - } - return moveToNextSegment() ? currentReader.hasNext() : false; + return currentReader.hasNext() || moveToNextSegment() && currentReader.hasNext(); } @Override - public Indexed next() { + public final Indexed next() { if (currentReader.hasNext()) { previousEntry = currentReader.getCurrentEntry(); return currentReader.next(); @@ -154,7 +135,7 @@ final class SegmentedJournalReader implements JournalReader { } @Override - public void close() { + public final void close() { currentReader.close(); journal.closeReader(this); }