import akka.persistence.PersistentRepr;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
-import com.codahale.metrics.SlidingTimeWindowReservoir;
+import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.MoreObjects;
import io.atomix.storage.StorageLevel;
import java.util.function.Consumer;
import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.Iterator;
-import scala.collection.SeqLike;
import scala.concurrent.Future;
import scala.concurrent.Promise;
+import scala.jdk.javaapi.CollectionConverters;
/**
* This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s:
private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build();
private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
- // Tracks the time it took us to write a batch of messages
- private final Timer batchWriteTime = new Timer();
- // Tracks the number of individual messages written
- private final Meter messageWriteCount = new Meter();
- // Tracks the size distribution of messages for last 5 minutes
- private final Histogram messageSize = new Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.MINUTES));
-
private final String persistenceId;
private final StorageLevel storage;
private final int maxSegmentSize;
private final int maxEntrySize;
private final File directory;
+ // Tracks the time it took us to write a batch of messages
+ private Timer batchWriteTime;
+ // Tracks the number of individual messages written
+ private Meter messageWriteCount;
+ // Tracks the size distribution of messages
+ private Histogram messageSize;
+
private SegmentedJournal<DataJournalEntry> dataJournal;
private SegmentedJournal<Long> deleteJournal;
private long lastDelete;
public void preStart() throws Exception {
LOG.debug("{}: actor starting", persistenceId);
super.preStart();
+
+ final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+ final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
+
+ batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
+ messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
+ messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize"));
}
@Override
dataJournal.writer().commit(lastDelete);
LOG.debug("{}: compaction started", persistenceId);
- dataJournal.compact(lastDelete);
+ dataJournal.compact(lastDelete + 1);
deleteJournal.compact(entry.index());
LOG.debug("{}: compaction finished", persistenceId);
} else {
message.promise.success(null);
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
final Long sequence;
}
private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
- // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276
- final Iterator<PersistentRepr> it = ((SeqLike<PersistentRepr, ?>) request.payload()).iterator();
- while (it.hasNext()) {
- final PersistentRepr repr = it.next();
+ for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) {
final Object payload = repr.payload();
if (!(payload instanceof Serializable)) {
throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
.withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
- lastDelete = lastEntry == null ? 0 : lastEntry.index();
+ lastDelete = lastEntry == null ? 0 : lastEntry.entry();
dataJournal = SegmentedJournal.<DataJournalEntry>builder()
.withStorageLevel(storage).withDirectory(directory).withName("data")