import akka.dispatch.MailboxType;
import akka.dispatch.ProducesMessageQueue;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
private final Integer capacity;
private final FiniteDuration pushTimeOut;
- private final MetricRegistry registry;
public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
CommonConfig commonConfig = new CommonConfig(settings.config());
this.capacity = commonConfig.getMailBoxCapacity();
this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
-
- MetricsReporter reporter = MetricsReporter.getInstance(MeteringBehavior.DOMAIN);
- registry = reporter.getMetricsRegistry();
}
}
private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+ registerMetric(owner, QUEUE_SIZE, getQueueSizeGuage(monitoredQueue));
+ }
+
+ private static Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) {
+ return monitoredQueue::size;
+ }
+
+ static <T extends Metric> void registerMetric(scala.Option<ActorRef> owner, String metricName, T metric) {
if (owner.isEmpty()) {
- return; //there's no actor to monitor
+ // there's no actor to monitor
+ return;
}
+
String actorName = owner.get().path().toStringWithoutAddress();
- String metricName = MetricRegistry.name(actorName, QUEUE_SIZE);
+ String fullName = MetricRegistry.name(actorName, metricName);
+
+ MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
- if (registry.getMetrics().containsKey(metricName)) {
- return; //already registered
+ if (registry.getMetrics().containsKey(fullName)) {
+ // already registered
+ return;
}
- Gauge<Integer> queueSize = getQueueSizeGuage(monitoredQueue);
- registerQueueSizeMetric(metricName, queueSize);
+ try {
+ registry.register(fullName, metric);
+ } catch (IllegalArgumentException e) {
+ // already registered - shouldn't happen here since we check above...
+ LOG.debug("Unable to register '{}' in metrics registry: {}", e);
+ }
}
-
public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
private static final long serialVersionUID = 1L;
super(capacity, pushTimeOut);
}
}
-
- private static Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) {
- return monitoredQueue::size;
- }
-
- private void registerQueueSizeMetric(String metricName, Gauge<Integer> metric) {
- try {
- registry.register(metricName,metric);
- } catch (IllegalArgumentException e) {
- LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);
- }
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.ControlMessage;
+import akka.dispatch.DequeBasedMessageQueueSemantics;
+import akka.dispatch.Envelope;
+import akka.dispatch.MailboxType;
+import akka.dispatch.ProducesMessageQueue;
+import akka.dispatch.UnboundedControlAwareMailbox;
+import com.codahale.metrics.Gauge;
+import com.typesafe.config.Config;
+import java.util.Deque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+/**
+ * An unbounded ControlAwareMailbox that also supports DequeBasedMessageQueueSemantics so it can be used with
+ * persistent actors which use stashing.
+ *
+ * @author Thomas Pantelis
+ */
+public class UnboundedDequeBasedControlAwareMailbox implements MailboxType,
+ ProducesMessageQueue<UnboundedDequeBasedControlAwareMailbox.MessageQueue> {
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedDequeBasedControlAwareMailbox.class);
+ private static final String NORMAL_QUEUE_SIZE = "normal-q-size";
+ private static final String CONTROL_QUEUE_SIZE = "control-q-size";
+ private static final String TOTAL_QUEUE_SIZE = "total-q-size";
+
+ public UnboundedDequeBasedControlAwareMailbox(ActorSystem.Settings settings, Config config) {
+ }
+
+ @Override
+ public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
+ LOG.debug("Creating MessageQueue for {}", owner);
+
+ final MessageQueue queue = new MessageQueue();
+
+ MeteredBoundedMailbox.registerMetric(owner, NORMAL_QUEUE_SIZE, (Gauge<Integer>) () -> queue.queue().size());
+ MeteredBoundedMailbox.registerMetric(owner, CONTROL_QUEUE_SIZE,
+ (Gauge<Integer>) () -> queue.controlQueue().size());
+ MeteredBoundedMailbox.registerMetric(owner, TOTAL_QUEUE_SIZE, (Gauge<Integer>) queue::numberOfMessages);
+
+ return queue;
+ }
+
+ static class MessageQueue extends UnboundedControlAwareMailbox.MessageQueue
+ implements DequeBasedMessageQueueSemantics {
+ private static final long serialVersionUID = 1L;
+
+ private final Deque<Envelope> controlQueue = new ConcurrentLinkedDeque<>();
+ private final Deque<Envelope> queue = new ConcurrentLinkedDeque<>();
+
+ @Override
+ public Queue<Envelope> controlQueue() {
+ return controlQueue;
+ }
+
+ @Override
+ public Queue<Envelope> queue() {
+ return queue;
+ }
+
+ @Override
+ public void enqueueFirst(ActorRef actor, Envelope envelope) {
+ final Object message = envelope.message();
+ LOG.debug("enqueueFirst: actor {}, message type: {}", actor, message.getClass());
+ if (message instanceof ControlMessage) {
+ LOG.debug("Adding {} to the ControlMessage queue", message.getClass());
+ controlQueue.addFirst(envelope);
+ } else {
+ LOG.debug("Adding {} to the normal queue", message.getClass());
+ queue.addFirst(envelope);
+ }
+ }
+ }
+}