From 794f28ea9f2c22dfb7042266b71ada659a920ab7 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 21:51:31 +0200 Subject: [PATCH] Move JournalWriter.getLastIndex() Last written index is a property of a particular Journal, not of a writer -- and now that we maintain this in the index, we can make shortcuts. This also removes a source of confusion, as we have two methods taking a 'long index' and performing some writer adjustments: - reset(long) is equivalent of setNextIndex() - truncate(long) is equivalent of setLastIndex() Change-Id: I1bc4b5d1b3052c2b35808b8ec4ea2d88dcfca593 Signed-off-by: Robert Varga --- .../storage/journal/ByteBufJournal.java | 7 ++++ .../atomix/storage/journal/ByteBufWriter.java | 7 ---- .../io/atomix/storage/journal/Journal.java | 7 ++++ .../storage/journal/JournalSegment.java | 7 ++-- .../storage/journal/JournalSegmentWriter.java | 38 +++++++------------ .../atomix/storage/journal/JournalWriter.java | 7 ---- .../journal/SegmentedByteBufJournal.java | 5 +++ .../journal/SegmentedByteBufWriter.java | 7 +--- .../storage/journal/SegmentedJournal.java | 5 +++ .../journal/SegmentedJournalWriter.java | 5 --- .../storage/journal/AbstractJournalTest.java | 18 ++++++--- .../akka/segjournal/DataJournalV0.java | 11 +++--- .../segjournal/SegmentedJournalActor.java | 10 +++-- 13 files changed, 68 insertions(+), 66 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java index baaa6b0ba9..dc3e75bcde 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java @@ -23,6 +23,13 @@ import org.eclipse.jdt.annotation.NonNullByDefault; */ @NonNullByDefault public interface ByteBufJournal extends AutoCloseable { + /** + * Return the index of the last entry in the journal. + * + * @return the last index, or zero if there are no entries. + */ + long lastIndex(); + /** * Returns the journal writer. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java index 2f85cc6163..58f9d35291 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java @@ -23,13 +23,6 @@ import org.eclipse.jdt.annotation.NonNullByDefault; */ @NonNullByDefault public interface ByteBufWriter { - /** - * Returns the last written index. - * - * @return The last written index - */ - long lastIndex(); - /** * Returns the next index to be written. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java index 93ae0a565b..39be7d4d5f 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java @@ -23,6 +23,13 @@ import io.atomix.storage.journal.JournalReader.Mode; * @author Jordan Halterman */ public interface Journal extends AutoCloseable { + /** + * Return the index of the last entry in the journal. + * + * @return the last index, or zero if there are no entries. + */ + long lastIndex(); + /** * Returns the journal writer. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index 2128b87e20..844a1c9214 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -39,12 +39,12 @@ import org.slf4j.LoggerFactory; final class JournalSegment { private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class); + private final Set readers = ConcurrentHashMap.newKeySet(); + private final AtomicInteger references = new AtomicInteger(); private final JournalSegmentFile file; private final StorageLevel storageLevel; private final int maxEntrySize; private final JournalIndex journalIndex; - private final Set readers = ConcurrentHashMap.newKeySet(); - private final AtomicInteger references = new AtomicInteger(); private JournalSegmentWriter writer; private boolean open = true; @@ -83,7 +83,8 @@ final class JournalSegment { * @return The last index in the segment. */ long lastIndex() { - return writer.getLastIndex(); + final var lastPosition = journalIndex.last(); + return lastPosition != null ? lastPosition.index() : firstIndex() - 1; } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index b18371f044..70cc790389 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -18,6 +18,7 @@ package io.atomix.storage.journal; import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; import static java.util.Objects.requireNonNull; +import io.atomix.storage.journal.StorageException.TooLarge; import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.Position; import io.netty.buffer.ByteBuf; @@ -32,17 +33,17 @@ final class JournalSegmentWriter { private final FileWriter fileWriter; final @NonNull JournalSegment segment; - private final @NonNull JournalIndex index; + private final @NonNull JournalIndex journalIndex; final int maxSegmentSize; final int maxEntrySize; private int currentPosition; JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize, - final JournalIndex index) { + final JournalIndex journalIndex) { this.fileWriter = requireNonNull(fileWriter); this.segment = requireNonNull(segment); - this.index = requireNonNull(index); + this.journalIndex = requireNonNull(journalIndex); maxSegmentSize = segment.file().maxSize(); this.maxEntrySize = maxEntrySize; // adjust lastEntry value @@ -51,31 +52,21 @@ final class JournalSegmentWriter { JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) { segment = previous.segment; - index = previous.index; + journalIndex = previous.journalIndex; maxSegmentSize = previous.maxSegmentSize; maxEntrySize = previous.maxEntrySize; currentPosition = previous.currentPosition; this.fileWriter = requireNonNull(fileWriter); } - /** - * Returns the last written index. - * - * @return The last written index. - */ - long getLastIndex() { - final var last = index.last(); - return last != null ? last.index() : segment.firstIndex() - 1; - } - /** * Returns the next index to be written. * * @return The next index to be written. */ - long getNextIndex() { - final var last = index.last(); - return last != null ? last.index() + 1 : segment.firstIndex(); + long nextIndex() { + final var lastPosition = journalIndex.last(); + return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex(); } /** @@ -87,12 +78,11 @@ final class JournalSegmentWriter { Position append(final ByteBuf buf) { final var length = buf.readableBytes(); if (length > maxEntrySize) { - throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes (" - + maxEntrySize + ")"); + throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); } // Store the entry index. - final long index = getNextIndex(); + final long index = nextIndex(); final int position = currentPosition; // check space available @@ -114,7 +104,7 @@ final class JournalSegmentWriter { // Update the last entry with the correct index/term/length. currentPosition = nextPosition; - return this.index.index(index, position); + return journalIndex.index(index, position); } /** @@ -147,7 +137,7 @@ final class JournalSegmentWriter { break; } - this.index.index(nextIndex++, currentPosition); + journalIndex.index(nextIndex++, currentPosition); // Update the current position for indexing. currentPosition += HEADER_BYTES + buf.readableBytes(); @@ -161,12 +151,12 @@ final class JournalSegmentWriter { */ void truncate(final long index) { // If the index is greater than or equal to the last index, skip the truncate. - if (index >= getLastIndex()) { + if (index >= segment.lastIndex()) { return; } // Truncate the index. - this.index.truncate(index); + journalIndex.truncate(index); if (index < segment.firstIndex()) { // Reset the writer to the first entry. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java index ae65778185..0561c99fe0 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java @@ -23,13 +23,6 @@ import org.eclipse.jdt.annotation.NonNull; * @author Jordan Halterman */ public interface JournalWriter { - /** - * Returns the last written index. - * - * @return The last written index. - */ - long getLastIndex(); - /** * Returns the next index to be written. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java index 074a5dd182..4698a5fd60 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -92,6 +92,11 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { .sum(); } + @Override + public long lastIndex() { + return lastSegment().lastIndex(); + } + @Override public ByteBufWriter writer() { return writer; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java index e16e6446eb..c51e8a2ffa 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -35,14 +35,9 @@ final class SegmentedByteBufWriter implements ByteBufWriter { currentWriter = currentSegment.acquireWriter(); } - @Override - public long lastIndex() { - return currentWriter.getLastIndex(); - } - @Override public long nextIndex() { - return currentWriter.getNextIndex(); + return currentWriter.nextIndex(); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 8f0464ebe3..f6c976742c 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -34,6 +34,11 @@ public final class SegmentedJournal implements Journal { writer = new SegmentedJournalWriter<>(journal.writer(), mapper); } + @Override + public long lastIndex() { + return journal.lastIndex(); + } + @Override public JournalWriter writer() { return writer; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java index 80c352ead0..11aa6c2431 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java @@ -30,11 +30,6 @@ final class SegmentedJournalWriter implements JournalWriter { this.mapper = requireNonNull(mapper); } - @Override - public long getLastIndex() { - return writer.lastIndex(); - } - @Override public long getNextIndex() { return writer.nextIndex(); diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java index d4bc43d9b2..026e58df7b 100644 --- a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java +++ b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java @@ -176,28 +176,34 @@ public abstract class AbstractJournalTest { JournalWriter writer = journal.writer(); JournalReader reader = journal.openReader(1); - assertEquals(0, writer.getLastIndex()); + assertEquals(0, journal.lastIndex()); + assertEquals(1, writer.getNextIndex()); writer.append(ENTRY); writer.append(ENTRY); writer.reset(1); - assertEquals(0, writer.getLastIndex()); + assertEquals(0, journal.lastIndex()); + assertEquals(1, writer.getNextIndex()); writer.append(ENTRY); var indexed = assertNext(reader); assertEquals(1, indexed.index()); writer.reset(1); - assertEquals(0, writer.getLastIndex()); + assertEquals(0, journal.lastIndex()); + assertEquals(1, writer.getNextIndex()); indexed = writer.append(ENTRY); - assertEquals(1, writer.getLastIndex()); + assertEquals(1, journal.lastIndex()); + assertEquals(2, writer.getNextIndex()); assertEquals(1, indexed.index()); indexed = assertNext(reader); assertEquals(1, indexed.index()); writer.truncate(0); - assertEquals(0, writer.getLastIndex()); + assertEquals(0, journal.lastIndex()); + assertEquals(1, writer.getNextIndex()); indexed = writer.append(ENTRY); - assertEquals(1, writer.getLastIndex()); + assertEquals(1, journal.lastIndex()); + assertEquals(2, writer.getNextIndex()); assertEquals(1, indexed.index()); indexed = assertNext(reader); diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java index ad4c110bc8..bf1700f7f0 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -51,7 +51,7 @@ final class DataJournalV0 extends DataJournal { @Override long lastWrittenSequenceNr() { - return entries.writer().getLastIndex(); + return entries.lastIndex(); } @Override @@ -120,17 +120,18 @@ final class DataJournalV0 extends DataJournal { long writtenBytes = 0; for (int i = 0; i < count; ++i) { - final long mark = writer.getLastIndex(); + final long prevNextIndex = writer.getNextIndex(); final var request = message.getRequest(i); final var reprs = CollectionConverters.asJava(request.payload()); - LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark); + LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), prevNextIndex); try { writtenBytes += writePayload(writer, reprs); } catch (Exception e) { - LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e); + LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, + prevNextIndex, e); responses.add(e); - writer.truncate(mark); + writer.reset(prevNextIndex); continue; } responses.add(null); diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 73ffab6a05..7e285f7d0d 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -494,9 +494,13 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { } final var sw = Stopwatch.createStarted(); - deleteJournal = SegmentedJournal.builder().withDirectory(directory).withName("delete") - .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build(); - final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.writer().getLastIndex()) + deleteJournal = SegmentedJournal.builder() + .withDirectory(directory) + .withName("delete") + .withNamespace(DELETE_NAMESPACE) + .withMaxSegmentSize(DELETE_SEGMENT_SIZE) + .build(); + final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex()) .tryNext((index, value, length) -> value); lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue(); -- 2.36.6