Bump versions 9.0.4-SNAPSHOT
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedJournalActor.java
index a3c28cac8addc8c774d64ea6408f1c20251d6d38..0b7808391162327188cc871c69caf75422b88c4c 100644 (file)
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Stopwatch;
 import io.atomix.storage.journal.Indexed;
 import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
@@ -36,6 +37,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -121,7 +124,6 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
 
         void setFailure(final int index, final Exception cause) {
             results.get(index).success(Optional.of(cause));
-
         }
 
         void setSuccess(final int index) {
@@ -193,7 +195,14 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
             }
         }
 
-        private final ArrayDeque<WrittenMessages> unflushedWrites = new ArrayDeque<>();
+        private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
+            UnflushedWrite {
+                requireNonNull(message);
+                requireNonNull(start);
+            }
+        }
+
+        private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
         private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
         private final long maxUnflushedBytes;
 
@@ -220,12 +229,12 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
         }
 
         @Override
-        void onWrittenMessages(final WrittenMessages message) {
+        void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
             boolean first = unflushedWrites.isEmpty();
             if (first) {
                 unflushedDuration.start();
             }
-            unflushedWrites.addLast(message);
+            unflushedWrites.addLast(new UnflushedWrite(message, started, count));
             unflushedBytes = unflushedBytes + message.writtenBytes;
             if (unflushedBytes >= maxUnflushedBytes) {
                 LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
@@ -249,7 +258,7 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
             flushJournal(unflushedBytes, unsyncedSize);
 
             final var sw = Stopwatch.createStarted();
-            unflushedWrites.forEach(WrittenMessages::complete);
+            unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
             unflushedWrites.clear();
             unflushedBytes = 0;
             unflushedDuration.reset();
@@ -264,9 +273,9 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
         }
 
         @Override
-        void onWrittenMessages(final WrittenMessages message) {
+        void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
             flushJournal(message.writtenBytes, 1);
-            message.complete();
+            completeWriteMessages(message, started, count);
         }
 
         @Override
@@ -276,10 +285,18 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
-    private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
-        .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
-        .build();
     private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+    private static final FromByteBufMapper<Long> READ_MAPPER;
+    private static final ToByteBufMapper<Long> WRITE_MAPPER;
+
+    static {
+        final var namespace = JournalSerdes.builder()
+            .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+            .build();
+
+        READ_MAPPER = namespace.toReadMapper();
+        WRITE_MAPPER = namespace.toWriteMapper();
+    }
 
     private final String persistenceId;
     private final StorageLevel storage;
@@ -453,25 +470,29 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
     private void handleWriteMessages(final WriteMessages message) {
         ensureOpen();
 
-        final var sw = Stopwatch.createStarted();
+        final var started = Stopwatch.createStarted();
         final long start = dataJournal.lastWrittenSequenceNr();
         final var writtenMessages = dataJournal.handleWriteMessages(message);
-        sw.stop();
 
-        batchWriteTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-        messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
+        onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start);
+    }
 
+    final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+        batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+        messageWriteCount.mark(count);
         // log message after statistics are updated
-        LOG.debug("{}: write of {} bytes completed in {}", persistenceId, writtenMessages.writtenBytes, sw);
-        onWrittenMessages(writtenMessages);
+        LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started);
+        message.complete();
     }
 
     /**
      * Handle a check of written messages.
      *
      * @param message Messages which were written
+     * @param started Stopwatch started when the write started
+     * @param count number of writes
      */
-    abstract void onWrittenMessages(WrittenMessages message);
+    abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count);
 
     private void handleUnknown(final Object message) {
         LOG.error("{}: Received unknown message {}", persistenceId, message);
@@ -484,11 +505,14 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
         }
 
         final var sw = Stopwatch.createStarted();
-        deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
-                .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
-        final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.writer().getLastIndex())
+        deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+            .withDirectory(directory)
+            .withName("delete")
+            .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+            .build(), READ_MAPPER, WRITE_MAPPER);
+        final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
             .tryNext((index, value, length) -> value);
-        lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue();
+        lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
 
         dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
             maxEntrySize, maxSegmentSize);