import akka.dispatch.MailboxType;
import akka.dispatch.ProducesMessageQueue;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+public class MeteredBoundedMailbox implements MailboxType,
+ ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+ private static final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
+ private static final String QUEUE_SIZE = "q-size";
- private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
+ private final Integer capacity;
+ private final FiniteDuration pushTimeOut;
- private MeteredMessageQueue queue;
- private Integer capacity;
- private FiniteDuration pushTimeOut;
- private MetricRegistry registry;
-
- private final String QUEUE_SIZE = "q-size";
-
- public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
+ public MeteredBoundedMailbox(final ActorSystem.Settings settings, final Config config) {
CommonConfig commonConfig = new CommonConfig(settings.config());
this.capacity = commonConfig.getMailBoxCapacity();
this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
-
- MetricsReporter reporter = MetricsReporter.getInstance();
- registry = reporter.getMetricsRegistry();
}
@Override
- public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
- this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
- monitorQueueSize(owner, this.queue);
- return this.queue;
+ public MeteredMessageQueue create(final scala.Option<ActorRef> owner, final scala.Option<ActorSystem> system) {
+ final MeteredMessageQueue queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+ monitorQueueSize(owner, queue);
+ return queue;
}
- private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+ private static void monitorQueueSize(final scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+ registerMetric(owner, QUEUE_SIZE, getQueueSizeGuage(monitoredQueue));
+ }
+
+ private static Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) {
+ return monitoredQueue::size;
+ }
+
+ static <T extends Metric> void registerMetric(final scala.Option<ActorRef> owner, final String metricName,
+ final T metric) {
if (owner.isEmpty()) {
- return; //there's no actor to monitor
+ // there's no actor to monitor
+ return;
}
+
String actorName = owner.get().path().toStringWithoutAddress();
- String metricName = MetricRegistry.name(actorName, QUEUE_SIZE);
+ String fullName = MetricRegistry.name(actorName, metricName);
- if (registry.getMetrics().containsKey(metricName))
- return; //already registered
+ MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
- Gauge<Integer> queueSize = getQueueSizeGuage(monitoredQueue);
- registerQueueSizeMetric(metricName, queueSize);
- }
+ if (registry.getMetrics().containsKey(fullName)) {
+ // already registered
+ return;
+ }
+ try {
+ registry.register(fullName, metric);
+ } catch (IllegalArgumentException e) {
+ // already registered - shouldn't happen here since we check above...
+ LOG.debug("Unable to register '{}' in metrics registry", fullName);
+ }
+ }
public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
private static final long serialVersionUID = 1L;
- public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
+ public MeteredMessageQueue(final int capacity, final FiniteDuration pushTimeOut) {
super(capacity, pushTimeOut);
}
}
-
- private Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){
- return new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return monitoredQueue.size();
- }
- };
- }
-
- private void registerQueueSizeMetric(String metricName, Gauge<Integer> metric){
- try {
- registry.register(metricName,metric);
- } catch (IllegalArgumentException e) {
- LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);
- }
- }
}