2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.common.actor;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.dispatch.BoundedDequeBasedMailbox;
14 import akka.dispatch.MailboxType;
15 import akka.dispatch.ProducesMessageQueue;
16 import com.codahale.metrics.Gauge;
17 import com.codahale.metrics.MetricRegistry;
18 import com.typesafe.config.Config;
19 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 import scala.concurrent.duration.FiniteDuration;
24 public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
26 private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
28 private MeteredMessageQueue queue;
29 private final Integer capacity;
30 private final FiniteDuration pushTimeOut;
31 private final MetricRegistry registry;
33 private final String QUEUE_SIZE = "q-size";
35 public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
37 CommonConfig commonConfig = new CommonConfig(settings.config());
38 this.capacity = commonConfig.getMailBoxCapacity();
39 this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
41 MetricsReporter reporter = MetricsReporter.getInstance(MeteringBehavior.DOMAIN);
42 registry = reporter.getMetricsRegistry();
47 public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
48 this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
49 monitorQueueSize(owner, this.queue);
53 private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
54 if (owner.isEmpty()) {
55 return; //there's no actor to monitor
57 String actorName = owner.get().path().toStringWithoutAddress();
58 String metricName = MetricRegistry.name(actorName, QUEUE_SIZE);
60 if (registry.getMetrics().containsKey(metricName))
62 return; //already registered
65 Gauge<Integer> queueSize = getQueueSizeGuage(monitoredQueue);
66 registerQueueSizeMetric(metricName, queueSize);
70 public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
71 private static final long serialVersionUID = 1L;
73 public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
74 super(capacity, pushTimeOut);
78 private Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){
79 return new Gauge<Integer>() {
81 public Integer getValue() {
82 return monitoredQueue.size();
87 private void registerQueueSizeMetric(String metricName, Gauge<Integer> metric){
89 registry.register(metricName,metric);
90 } catch (IllegalArgumentException e) {
91 LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);