From: Ruslan Kashapov Date: Wed, 27 Mar 2024 08:52:41 +0000 (+0200) Subject: Improve segmented journal actor metrics X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fheads%2Fmaster Improve segmented journal actor metrics Update write time marked on actual flush not on flush request. JIRA: CONTROLLER-2108 Change-Id: I92a66ae775cbae6aeea69bddf654df741f473dbd Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index a3c28cac8a..73ffab6a05 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -121,7 +121,6 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { void setFailure(final int index, final Exception cause) { results.get(index).success(Optional.of(cause)); - } void setSuccess(final int index) { @@ -193,7 +192,14 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { } } - private final ArrayDeque unflushedWrites = new ArrayDeque<>(); + private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) { + UnflushedWrite { + requireNonNull(message); + requireNonNull(start); + } + } + + private final ArrayDeque unflushedWrites = new ArrayDeque<>(); private final Stopwatch unflushedDuration = Stopwatch.createUnstarted(); private final long maxUnflushedBytes; @@ -220,12 +226,12 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { } @Override - void onWrittenMessages(final WrittenMessages message) { + void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) { boolean first = unflushedWrites.isEmpty(); if (first) { unflushedDuration.start(); } - unflushedWrites.addLast(message); + unflushedWrites.addLast(new UnflushedWrite(message, started, count)); unflushedBytes = unflushedBytes + message.writtenBytes; if (unflushedBytes >= maxUnflushedBytes) { LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes); @@ -249,7 +255,7 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { flushJournal(unflushedBytes, unsyncedSize); final var sw = Stopwatch.createStarted(); - unflushedWrites.forEach(WrittenMessages::complete); + unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count)); unflushedWrites.clear(); unflushedBytes = 0; unflushedDuration.reset(); @@ -264,9 +270,9 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { } @Override - void onWrittenMessages(final WrittenMessages message) { + void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) { flushJournal(message.writtenBytes, 1); - message.complete(); + completeWriteMessages(message, started, count); } @Override @@ -453,25 +459,29 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { private void handleWriteMessages(final WriteMessages message) { ensureOpen(); - final var sw = Stopwatch.createStarted(); + final var started = Stopwatch.createStarted(); final long start = dataJournal.lastWrittenSequenceNr(); final var writtenMessages = dataJournal.handleWriteMessages(message); - sw.stop(); - batchWriteTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); - messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start); + onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start); + } + final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) { + batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + messageWriteCount.mark(count); // log message after statistics are updated - LOG.debug("{}: write of {} bytes completed in {}", persistenceId, writtenMessages.writtenBytes, sw); - onWrittenMessages(writtenMessages); + LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started); + message.complete(); } /** * Handle a check of written messages. * * @param message Messages which were written + * @param started Stopwatch started when the write started + * @param count number of writes */ - abstract void onWrittenMessages(WrittenMessages message); + abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count); private void handleUnknown(final Object message) { LOG.error("{}: Received unknown message {}", persistenceId, message);