X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-segmented-journal%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fakka%2Fsegjournal%2FSegmentedJournalActor.java;h=4e0a54c4b24aff92fcbf3c80d4c98ba19e0dd6e0;hp=6cc5d9e672b37c7372928b855ee91a19bf5c6da0;hb=4b59df006c79ffb8119152e5a8bc6aadd276c031;hpb=52fb7d830e43a9849c728c7b0d36470afe686fb5 diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 6cc5d9e672..4e0a54c4b2 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -17,7 +17,7 @@ import akka.persistence.AtomicWrite; import akka.persistence.PersistentRepr; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; -import com.codahale.metrics.SlidingTimeWindowReservoir; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.base.MoreObjects; import io.atomix.storage.StorageLevel; @@ -35,12 +35,13 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence; import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence; +import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; +import org.opendaylight.controller.cluster.reporting.MetricsReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.Iterator; -import scala.collection.SeqLike; import scala.concurrent.Future; import scala.concurrent.Promise; +import scala.jdk.javaapi.CollectionConverters; /** * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s: @@ -137,19 +138,19 @@ final class SegmentedJournalActor extends AbstractActor { private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build(); private static final int DELETE_SEGMENT_SIZE = 64 * 1024; - // Tracks the time it took us to write a batch of messages - private final Timer batchWriteTime = new Timer(); - // Tracks the number of individual messages written - private final Meter messageWriteCount = new Meter(); - // Tracks the size distribution of messages for last 5 minutes - private final Histogram messageSize = new Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.MINUTES)); - private final String persistenceId; private final StorageLevel storage; private final int maxSegmentSize; private final int maxEntrySize; private final File directory; + // Tracks the time it took us to write a batch of messages + private Timer batchWriteTime; + // Tracks the number of individual messages written + private Meter messageWriteCount; + // Tracks the size distribution of messages + private Histogram messageSize; + private SegmentedJournal dataJournal; private SegmentedJournal deleteJournal; private long lastDelete; @@ -187,6 +188,13 @@ final class SegmentedJournalActor extends AbstractActor { public void preStart() throws Exception { LOG.debug("{}: actor starting", persistenceId); super.preStart(); + + final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry(); + final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName(); + + batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime")); + messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount")); + messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize")); } @Override @@ -236,7 +244,7 @@ final class SegmentedJournalActor extends AbstractActor { dataJournal.writer().commit(lastDelete); LOG.debug("{}: compaction started", persistenceId); - dataJournal.compact(lastDelete); + dataJournal.compact(lastDelete + 1); deleteJournal.compact(entry.index()); LOG.debug("{}: compaction finished", persistenceId); } else { @@ -246,7 +254,6 @@ final class SegmentedJournalActor extends AbstractActor { message.promise.success(null); } - @SuppressWarnings("checkstyle:illegalCatch") private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) { LOG.debug("{}: looking for highest sequence on {}", persistenceId, message); final Long sequence; @@ -324,10 +331,7 @@ final class SegmentedJournalActor extends AbstractActor { } private void writeRequest(final SegmentedJournalWriter writer, final AtomicWrite request) { - // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276 - final Iterator it = ((SeqLike) request.payload()).iterator(); - while (it.hasNext()) { - final PersistentRepr repr = it.next(); + for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) { final Object payload = repr.payload(); if (!(payload instanceof Serializable)) { throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass()); @@ -358,7 +362,7 @@ final class SegmentedJournalActor extends AbstractActor { deleteJournal = SegmentedJournal.builder().withDirectory(directory).withName("delete") .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build(); final Indexed lastEntry = deleteJournal.writer().getLastEntry(); - lastDelete = lastEntry == null ? 0 : lastEntry.index(); + lastDelete = lastEntry == null ? 0 : lastEntry.entry(); dataJournal = SegmentedJournal.builder() .withStorageLevel(storage).withDirectory(directory).withName("data")