From 6d03458bb71b7c0308746577d7e3035d33971c9b Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 25 May 2017 09:39:40 -0400 Subject: [PATCH 1/1] Bug 5740: Add Deque-based control-aware mailbox Since akka persistence uses stashing, it requires a mailbox to be Deque-based to provide the enqueueFirst method. However, the control-aware mailboxes provided by akka are not Deque-based so we need one that is. Change-Id: I74f214c725eff16aba093aad3f2f6eed80948ee4 Signed-off-by: Tom Pantelis --- .../common/actor/MeteredBoundedMailbox.java | 47 +++++----- ...nboundedDequeBasedControlAwareMailbox.java | 87 +++++++++++++++++++ 2 files changed, 111 insertions(+), 23 deletions(-) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnboundedDequeBasedControlAwareMailbox.java diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java index 31feac209b..258672114a 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java @@ -14,6 +14,7 @@ import akka.dispatch.BoundedDequeBasedMailbox; import akka.dispatch.MailboxType; import akka.dispatch.ProducesMessageQueue; import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.typesafe.config.Config; import org.opendaylight.controller.cluster.reporting.MetricsReporter; @@ -28,16 +29,12 @@ public class MeteredBoundedMailbox implements MailboxType, private final Integer capacity; private final FiniteDuration pushTimeOut; - private final MetricRegistry registry; public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) { CommonConfig commonConfig = new CommonConfig(settings.config()); this.capacity = commonConfig.getMailBoxCapacity(); this.pushTimeOut = commonConfig.getMailBoxPushTimeout(); - - MetricsReporter reporter = MetricsReporter.getInstance(MeteringBehavior.DOMAIN); - registry = reporter.getMetricsRegistry(); } @@ -49,21 +46,37 @@ public class MeteredBoundedMailbox implements MailboxType, } private void monitorQueueSize(scala.Option owner, final MeteredMessageQueue monitoredQueue) { + registerMetric(owner, QUEUE_SIZE, getQueueSizeGuage(monitoredQueue)); + } + + private static Gauge getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) { + return monitoredQueue::size; + } + + static void registerMetric(scala.Option owner, String metricName, T metric) { if (owner.isEmpty()) { - return; //there's no actor to monitor + // there's no actor to monitor + return; } + String actorName = owner.get().path().toStringWithoutAddress(); - String metricName = MetricRegistry.name(actorName, QUEUE_SIZE); + String fullName = MetricRegistry.name(actorName, metricName); + + MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry(); - if (registry.getMetrics().containsKey(metricName)) { - return; //already registered + if (registry.getMetrics().containsKey(fullName)) { + // already registered + return; } - Gauge queueSize = getQueueSizeGuage(monitoredQueue); - registerQueueSizeMetric(metricName, queueSize); + try { + registry.register(fullName, metric); + } catch (IllegalArgumentException e) { + // already registered - shouldn't happen here since we check above... + LOG.debug("Unable to register '{}' in metrics registry: {}", e); + } } - public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue { private static final long serialVersionUID = 1L; @@ -71,17 +84,5 @@ public class MeteredBoundedMailbox implements MailboxType, super(capacity, pushTimeOut); } } - - private static Gauge getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) { - return monitoredQueue::size; - } - - private void registerQueueSizeMetric(String metricName, Gauge metric) { - try { - registry.register(metricName,metric); - } catch (IllegalArgumentException e) { - LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e); - } - } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnboundedDequeBasedControlAwareMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnboundedDequeBasedControlAwareMailbox.java new file mode 100644 index 0000000000..46c42ed8f5 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnboundedDequeBasedControlAwareMailbox.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.common.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.ControlMessage; +import akka.dispatch.DequeBasedMessageQueueSemantics; +import akka.dispatch.Envelope; +import akka.dispatch.MailboxType; +import akka.dispatch.ProducesMessageQueue; +import akka.dispatch.UnboundedControlAwareMailbox; +import com.codahale.metrics.Gauge; +import com.typesafe.config.Config; +import java.util.Deque; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +/** + * An unbounded ControlAwareMailbox that also supports DequeBasedMessageQueueSemantics so it can be used with + * persistent actors which use stashing. + * + * @author Thomas Pantelis + */ +public class UnboundedDequeBasedControlAwareMailbox implements MailboxType, + ProducesMessageQueue { + private static final Logger LOG = LoggerFactory.getLogger(UnboundedDequeBasedControlAwareMailbox.class); + private static final String NORMAL_QUEUE_SIZE = "normal-q-size"; + private static final String CONTROL_QUEUE_SIZE = "control-q-size"; + private static final String TOTAL_QUEUE_SIZE = "total-q-size"; + + public UnboundedDequeBasedControlAwareMailbox(ActorSystem.Settings settings, Config config) { + } + + @Override + public MessageQueue create(Option owner, Option system) { + LOG.debug("Creating MessageQueue for {}", owner); + + final MessageQueue queue = new MessageQueue(); + + MeteredBoundedMailbox.registerMetric(owner, NORMAL_QUEUE_SIZE, (Gauge) () -> queue.queue().size()); + MeteredBoundedMailbox.registerMetric(owner, CONTROL_QUEUE_SIZE, + (Gauge) () -> queue.controlQueue().size()); + MeteredBoundedMailbox.registerMetric(owner, TOTAL_QUEUE_SIZE, (Gauge) queue::numberOfMessages); + + return queue; + } + + static class MessageQueue extends UnboundedControlAwareMailbox.MessageQueue + implements DequeBasedMessageQueueSemantics { + private static final long serialVersionUID = 1L; + + private final Deque controlQueue = new ConcurrentLinkedDeque<>(); + private final Deque queue = new ConcurrentLinkedDeque<>(); + + @Override + public Queue controlQueue() { + return controlQueue; + } + + @Override + public Queue queue() { + return queue; + } + + @Override + public void enqueueFirst(ActorRef actor, Envelope envelope) { + final Object message = envelope.message(); + LOG.debug("enqueueFirst: actor {}, message type: {}", actor, message.getClass()); + if (message instanceof ControlMessage) { + LOG.debug("Adding {} to the ControlMessage queue", message.getClass()); + controlQueue.addFirst(envelope); + } else { + LOG.debug("Adding {} to the normal queue", message.getClass()); + queue.addFirst(envelope); + } + } + } +} -- 2.36.6