Improve segmented journal actor metrics 03/111103/6 master
authorRuslan Kashapov <ruslan.kashapov@pantheon.tech>
Wed, 27 Mar 2024 08:52:41 +0000 (10:52 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 23 Apr 2024 17:26:32 +0000 (19:26 +0200)
Update write time marked on actual flush not on flush request.

JIRA: CONTROLLER-2108
Change-Id: I92a66ae775cbae6aeea69bddf654df741f473dbd
Signed-off-by: Ruslan Kashapov <ruslan.kashapov@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java

index a3c28cac8addc8c774d64ea6408f1c20251d6d38..73ffab6a053639f89b45470efab8685408efb9a3 100644 (file)
@@ -121,7 +121,6 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
 
         void setFailure(final int index, final Exception cause) {
             results.get(index).success(Optional.of(cause));
 
         void setFailure(final int index, final Exception cause) {
             results.get(index).success(Optional.of(cause));
-
         }
 
         void setSuccess(final int index) {
         }
 
         void setSuccess(final int index) {
@@ -193,7 +192,14 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
             }
         }
 
             }
         }
 
-        private final ArrayDeque<WrittenMessages> unflushedWrites = new ArrayDeque<>();
+        private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
+            UnflushedWrite {
+                requireNonNull(message);
+                requireNonNull(start);
+            }
+        }
+
+        private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
         private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
         private final long maxUnflushedBytes;
 
         private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
         private final long maxUnflushedBytes;
 
@@ -220,12 +226,12 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
         }
 
         @Override
         }
 
         @Override
-        void onWrittenMessages(final WrittenMessages message) {
+        void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
             boolean first = unflushedWrites.isEmpty();
             if (first) {
                 unflushedDuration.start();
             }
             boolean first = unflushedWrites.isEmpty();
             if (first) {
                 unflushedDuration.start();
             }
-            unflushedWrites.addLast(message);
+            unflushedWrites.addLast(new UnflushedWrite(message, started, count));
             unflushedBytes = unflushedBytes + message.writtenBytes;
             if (unflushedBytes >= maxUnflushedBytes) {
                 LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
             unflushedBytes = unflushedBytes + message.writtenBytes;
             if (unflushedBytes >= maxUnflushedBytes) {
                 LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
@@ -249,7 +255,7 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
             flushJournal(unflushedBytes, unsyncedSize);
 
             final var sw = Stopwatch.createStarted();
             flushJournal(unflushedBytes, unsyncedSize);
 
             final var sw = Stopwatch.createStarted();
-            unflushedWrites.forEach(WrittenMessages::complete);
+            unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
             unflushedWrites.clear();
             unflushedBytes = 0;
             unflushedDuration.reset();
             unflushedWrites.clear();
             unflushedBytes = 0;
             unflushedDuration.reset();
@@ -264,9 +270,9 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
         }
 
         @Override
         }
 
         @Override
-        void onWrittenMessages(final WrittenMessages message) {
+        void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
             flushJournal(message.writtenBytes, 1);
             flushJournal(message.writtenBytes, 1);
-            message.complete();
+            completeWriteMessages(message, started, count);
         }
 
         @Override
         }
 
         @Override
@@ -453,25 +459,29 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
     private void handleWriteMessages(final WriteMessages message) {
         ensureOpen();
 
     private void handleWriteMessages(final WriteMessages message) {
         ensureOpen();
 
-        final var sw = Stopwatch.createStarted();
+        final var started = Stopwatch.createStarted();
         final long start = dataJournal.lastWrittenSequenceNr();
         final var writtenMessages = dataJournal.handleWriteMessages(message);
         final long start = dataJournal.lastWrittenSequenceNr();
         final var writtenMessages = dataJournal.handleWriteMessages(message);
-        sw.stop();
 
 
-        batchWriteTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-        messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
+        onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start);
+    }
 
 
+    final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+        batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+        messageWriteCount.mark(count);
         // log message after statistics are updated
         // log message after statistics are updated
-        LOG.debug("{}: write of {} bytes completed in {}", persistenceId, writtenMessages.writtenBytes, sw);
-        onWrittenMessages(writtenMessages);
+        LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started);
+        message.complete();
     }
 
     /**
      * Handle a check of written messages.
      *
      * @param message Messages which were written
     }
 
     /**
      * Handle a check of written messages.
      *
      * @param message Messages which were written
+     * @param started Stopwatch started when the write started
+     * @param count number of writes
      */
      */
-    abstract void onWrittenMessages(WrittenMessages message);
+    abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count);
 
     private void handleUnknown(final Object message) {
         LOG.error("{}: Received unknown message {}", persistenceId, message);
 
     private void handleUnknown(final Object message) {
         LOG.error("{}: Received unknown message {}", persistenceId, message);