Bug 5740: Add Deque-based control-aware mailbox
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / MeteredBoundedMailbox.java
index 2a6aac4d79d95ec603591fa84865d0517c35d84d..258672114a9ac2ccce436b8623e4fd3f209796bb 100644 (file)
@@ -14,6 +14,7 @@ import akka.dispatch.BoundedDequeBasedMailbox;
 import akka.dispatch.MailboxType;
 import akka.dispatch.ProducesMessageQueue;
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
@@ -21,52 +22,61 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+public class MeteredBoundedMailbox implements MailboxType,
+        ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+    private static final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
+    private static final String QUEUE_SIZE = "q-size";
 
-    private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
-
-    private MeteredMessageQueue queue;
     private final Integer capacity;
     private final FiniteDuration pushTimeOut;
-    private final MetricRegistry registry;
-
-    private final String QUEUE_SIZE = "q-size";
 
     public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
 
         CommonConfig commonConfig = new CommonConfig(settings.config());
         this.capacity = commonConfig.getMailBoxCapacity();
         this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
-
-        MetricsReporter reporter = MetricsReporter.getInstance(MeteringBehavior.DOMAIN);
-        registry = reporter.getMetricsRegistry();
     }
 
 
     @Override
     public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
-        this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
-        monitorQueueSize(owner, this.queue);
-        return this.queue;
+        final MeteredMessageQueue queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+        monitorQueueSize(owner, queue);
+        return queue;
     }
 
     private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+        registerMetric(owner, QUEUE_SIZE, getQueueSizeGuage(monitoredQueue));
+    }
+
+    private static Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) {
+        return monitoredQueue::size;
+    }
+
+    static <T extends Metric> void registerMetric(scala.Option<ActorRef> owner, String metricName, T metric) {
         if (owner.isEmpty()) {
-            return; //there's no actor to monitor
+           // there's no actor to monitor
+            return;
         }
+
         String actorName = owner.get().path().toStringWithoutAddress();
-        String metricName = MetricRegistry.name(actorName, QUEUE_SIZE);
+        String fullName = MetricRegistry.name(actorName, metricName);
 
-        if (registry.getMetrics().containsKey(metricName))
-        {
-            return; //already registered
+        MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+
+        if (registry.getMetrics().containsKey(fullName)) {
+            // already registered
+            return;
         }
 
-        Gauge<Integer> queueSize = getQueueSizeGuage(monitoredQueue);
-        registerQueueSizeMetric(metricName, queueSize);
+        try {
+            registry.register(fullName, metric);
+        } catch (IllegalArgumentException e) {
+            // already registered - shouldn't happen here since we check above...
+            LOG.debug("Unable to register '{}' in metrics registry: {}", e);
+        }
     }
 
-
     public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
         private static final long serialVersionUID = 1L;
 
@@ -74,22 +84,5 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
             super(capacity, pushTimeOut);
         }
     }
-
-    private Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){
-        return new Gauge<Integer>() {
-            @Override
-            public Integer getValue() {
-                return monitoredQueue.size();
-            }
-        };
-    }
-
-    private void registerQueueSizeMetric(String metricName, Gauge<Integer> metric){
-        try {
-            registry.register(metricName,metric);
-        } catch (IllegalArgumentException e) {
-            LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);
-        }
-    }
 }