Bump odlparent/yangtools/mdsal
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedJournalActor.java
index 6cc5d9e672b37c7372928b855ee91a19bf5c6da0..4e0a54c4b24aff92fcbf3c80d4c98ba19e0dd6e0 100644 (file)
@@ -17,7 +17,7 @@ import akka.persistence.AtomicWrite;
 import akka.persistence.PersistentRepr;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
-import com.codahale.metrics.SlidingTimeWindowReservoir;
+import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.MoreObjects;
 import io.atomix.storage.StorageLevel;
@@ -35,12 +35,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.Iterator;
-import scala.collection.SeqLike;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
+import scala.jdk.javaapi.CollectionConverters;
 
 /**
  * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s:
@@ -137,19 +138,19 @@ final class SegmentedJournalActor extends AbstractActor {
     private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build();
     private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
 
-    // Tracks the time it took us to write a batch of messages
-    private final Timer batchWriteTime = new Timer();
-    // Tracks the number of individual messages written
-    private final Meter messageWriteCount = new Meter();
-    // Tracks the size distribution of messages for last 5 minutes
-    private final Histogram messageSize = new Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.MINUTES));
-
     private final String persistenceId;
     private final StorageLevel storage;
     private final int maxSegmentSize;
     private final int maxEntrySize;
     private final File directory;
 
+    // Tracks the time it took us to write a batch of messages
+    private Timer batchWriteTime;
+    // Tracks the number of individual messages written
+    private Meter messageWriteCount;
+    // Tracks the size distribution of messages
+    private Histogram messageSize;
+
     private SegmentedJournal<DataJournalEntry> dataJournal;
     private SegmentedJournal<Long> deleteJournal;
     private long lastDelete;
@@ -187,6 +188,13 @@ final class SegmentedJournalActor extends AbstractActor {
     public void preStart() throws Exception {
         LOG.debug("{}: actor starting", persistenceId);
         super.preStart();
+
+        final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+        final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
+
+        batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
+        messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
+        messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize"));
     }
 
     @Override
@@ -236,7 +244,7 @@ final class SegmentedJournalActor extends AbstractActor {
             dataJournal.writer().commit(lastDelete);
 
             LOG.debug("{}: compaction started", persistenceId);
-            dataJournal.compact(lastDelete);
+            dataJournal.compact(lastDelete + 1);
             deleteJournal.compact(entry.index());
             LOG.debug("{}: compaction finished", persistenceId);
         } else {
@@ -246,7 +254,6 @@ final class SegmentedJournalActor extends AbstractActor {
         message.promise.success(null);
     }
 
-    @SuppressWarnings("checkstyle:illegalCatch")
     private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
         LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
         final Long sequence;
@@ -324,10 +331,7 @@ final class SegmentedJournalActor extends AbstractActor {
     }
 
     private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
-        // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276
-        final Iterator<PersistentRepr> it = ((SeqLike<PersistentRepr, ?>) request.payload()).iterator();
-        while (it.hasNext()) {
-            final PersistentRepr repr = it.next();
+        for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) {
             final Object payload = repr.payload();
             if (!(payload instanceof Serializable)) {
                 throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
@@ -358,7 +362,7 @@ final class SegmentedJournalActor extends AbstractActor {
         deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
                 .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
         final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
-        lastDelete = lastEntry == null ? 0 : lastEntry.index();
+        lastDelete = lastEntry == null ? 0 : lastEntry.entry();
 
         dataJournal = SegmentedJournal.<DataJournalEntry>builder()
                 .withStorageLevel(storage).withDirectory(directory).withName("data")