From 137b27fe400ad6f516749557fbb108bf4f6cfc25 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 16 Oct 2020 15:17:50 +0200 Subject: [PATCH 1/1] Update DataJournal interface The interface is not documented and we need to differentiate between indices as viewed from segmented journal and the persistence journal views. JIRA: CONTROLLER-1954 Change-Id: Ibbc384b88f8d5567e9af18d83b9a84f7c9b9634c Signed-off-by: Robert Varga (cherry picked from commit 255e74efd633f2fbca7ce4f1372004d93cc81a10) --- .../akka/segjournal/DataJournal.java | 39 ++++++++++++++++--- .../akka/segjournal/DataJournalEntry.java | 6 +++ .../akka/segjournal/DataJournalV0.java | 14 +++---- .../segjournal/SegmentedJournalActor.java | 19 +++++---- 4 files changed, 56 insertions(+), 22 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java index 3f746900c9..678749b1c1 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java @@ -42,15 +42,44 @@ abstract class DataJournal { } } - abstract long lastWrittenIndex(); + /** + * Return the last sequence number completely written to the journal. + * + * @return Last written sequence number, {@code -1} if there are no in the journal. + */ + abstract long lastWrittenSequenceNr(); - abstract void commitTo(long index); + /** + * Delete all messages up to specified sequence number. + * + * @param sequenceNr Sequence number to delete to. + */ + abstract void deleteTo(long sequenceNr); - abstract void compactTo(long index); + /** + * Delete all messages up to specified sequence number. + * + * @param sequenceNr Sequence number to compact to. + */ + abstract void compactTo(long sequenceNr); + /** + * Close this journal, freeing up resources associated with it. + */ abstract void close(); - abstract void handleReplayMessages(ReplayMessages message, long from); + /** + * Handle a request to replay messages. + * + * @param message Request message + * @param fromSequenceNr Sequence number to replay from, adjusted for deletions + */ + abstract void handleReplayMessages(@NonNull ReplayMessages message, long fromSequenceNr); - abstract void handleWriteMessages(WriteMessages message); + /** + * Handle a request to store some messages. + * + * @param message Request message + */ + abstract void handleWriteMessages(@NonNull WriteMessages message); } diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java index 0713c0212a..6899c6e1d6 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java @@ -20,6 +20,9 @@ import io.atomix.storage.journal.JournalSegment; * @author Robert Varga */ abstract class DataJournalEntry { + /** + * A single data journal entry on its way to the backing file. + */ static final class ToPersistence extends DataJournalEntry { private final PersistentRepr repr; @@ -32,6 +35,9 @@ abstract class DataJournalEntry { } } + /** + * A single data journal entry on its way from the backing file. + */ static final class FromPersistence extends DataJournalEntry { private final String manifest; private final String writerUuid; diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java index 766f2fac05..bc5eead800 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -52,18 +52,18 @@ final class DataJournalV0 extends DataJournal { } @Override - long lastWrittenIndex() { + long lastWrittenSequenceNr() { return entries.writer().getLastIndex(); } @Override - void commitTo(final long index) { - entries.writer().commit(index); + void deleteTo(final long sequenceNr) { + entries.writer().commit(sequenceNr); } @Override - void compactTo(final long index) { - entries.compact(index); + void compactTo(final long sequenceNr) { + entries.compact(sequenceNr + 1); } @Override @@ -73,8 +73,8 @@ final class DataJournalV0 extends DataJournal { @Override @SuppressWarnings("checkstyle:illegalCatch") - void handleReplayMessages(final ReplayMessages message, final long from) { - try (SegmentedJournalReader reader = entries.openReader(from)) { + void handleReplayMessages(final ReplayMessages message, final long fromSequenceNr) { + try (SegmentedJournalReader reader = entries.openReader(fromSequenceNr)) { int count = 0; while (reader.hasNext() && count < message.max) { final Indexed next = reader.next(); diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 41117c8e32..e5c5b7807b 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -239,7 +239,7 @@ final class SegmentedJournalActor extends AbstractActor { ensureOpen(); LOG.debug("{}: delete messages {}", persistenceId, message); - final long to = Long.min(dataJournal.lastWrittenIndex(), message.toSequenceNr); + final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr); LOG.debug("{}: adjusted delete to {}", persistenceId, to); if (lastDelete < to) { @@ -249,10 +249,10 @@ final class SegmentedJournalActor extends AbstractActor { final SegmentedJournalWriter deleteWriter = deleteJournal.writer(); final Indexed entry = deleteWriter.append(lastDelete); deleteWriter.commit(entry.index()); - dataJournal.commitTo(lastDelete); + dataJournal.deleteTo(lastDelete); LOG.debug("{}: compaction started", persistenceId); - dataJournal.compactTo(lastDelete + 1); + dataJournal.compactTo(lastDelete); deleteJournal.compact(entry.index()); LOG.debug("{}: compaction finished", persistenceId); } else { @@ -267,7 +267,7 @@ final class SegmentedJournalActor extends AbstractActor { final Long sequence; if (directory.isDirectory()) { ensureOpen(); - sequence = dataJournal.lastWrittenIndex(); + sequence = dataJournal.lastWrittenSequenceNr(); } else { sequence = 0L; } @@ -276,7 +276,6 @@ final class SegmentedJournalActor extends AbstractActor { message.promise.success(sequence); } - @SuppressWarnings("checkstyle:illegalCatch") private void handleReplayMessages(final ReplayMessages message) { LOG.debug("{}: replaying messages {}", persistenceId, message); ensureOpen(); @@ -291,12 +290,12 @@ final class SegmentedJournalActor extends AbstractActor { ensureOpen(); final long startTicks = System.nanoTime(); - final long start = dataJournal.lastWrittenIndex(); + final long start = dataJournal.lastWrittenSequenceNr(); dataJournal.handleWriteMessages(message); batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS); - messageWriteCount.mark(dataJournal.lastWrittenIndex() - start); + messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start); } private void handleUnknown(final Object message) { @@ -316,8 +315,8 @@ final class SegmentedJournalActor extends AbstractActor { dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory, maxEntrySize, maxSegmentSize); - dataJournal.commitTo(lastDelete); - LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, dataJournal.lastWrittenIndex(), - lastDelete); + dataJournal.deleteTo(lastDelete); + LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, + dataJournal.lastWrittenSequenceNr(), lastDelete); } } -- 2.36.6