Bug 5740: Add Deque-based control-aware mailbox 18/57818/3
authorTom Pantelis <tompantelis@gmail.com>
Thu, 25 May 2017 13:39:40 +0000 (09:39 -0400)
committerRobert Varga <nite@hq.sk>
Mon, 29 May 2017 09:16:33 +0000 (09:16 +0000)
Since akka persistence uses stashing, it requires a mailbox to be
Deque-based to provide the enqueueFirst method. However, the
control-aware mailboxes provided by akka are not Deque-based so we
need one that is.

Change-Id: I74f214c725eff16aba093aad3f2f6eed80948ee4
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnboundedDequeBasedControlAwareMailbox.java [new file with mode: 0644]

index 31feac209bbb5cb0d0dc0ee185e4e6caaa0dd5ed..258672114a9ac2ccce436b8623e4fd3f209796bb 100644 (file)
@@ -14,6 +14,7 @@ import akka.dispatch.BoundedDequeBasedMailbox;
 import akka.dispatch.MailboxType;
 import akka.dispatch.ProducesMessageQueue;
 import com.codahale.metrics.Gauge;
 import akka.dispatch.MailboxType;
 import akka.dispatch.ProducesMessageQueue;
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
@@ -28,16 +29,12 @@ public class MeteredBoundedMailbox implements MailboxType,
 
     private final Integer capacity;
     private final FiniteDuration pushTimeOut;
 
     private final Integer capacity;
     private final FiniteDuration pushTimeOut;
-    private final MetricRegistry registry;
 
     public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
 
         CommonConfig commonConfig = new CommonConfig(settings.config());
         this.capacity = commonConfig.getMailBoxCapacity();
         this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
 
     public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
 
         CommonConfig commonConfig = new CommonConfig(settings.config());
         this.capacity = commonConfig.getMailBoxCapacity();
         this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
-
-        MetricsReporter reporter = MetricsReporter.getInstance(MeteringBehavior.DOMAIN);
-        registry = reporter.getMetricsRegistry();
     }
 
 
     }
 
 
@@ -49,21 +46,37 @@ public class MeteredBoundedMailbox implements MailboxType,
     }
 
     private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
     }
 
     private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+        registerMetric(owner, QUEUE_SIZE, getQueueSizeGuage(monitoredQueue));
+    }
+
+    private static Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) {
+        return monitoredQueue::size;
+    }
+
+    static <T extends Metric> void registerMetric(scala.Option<ActorRef> owner, String metricName, T metric) {
         if (owner.isEmpty()) {
         if (owner.isEmpty()) {
-            return; //there's no actor to monitor
+           // there's no actor to monitor
+            return;
         }
         }
+
         String actorName = owner.get().path().toStringWithoutAddress();
         String actorName = owner.get().path().toStringWithoutAddress();
-        String metricName = MetricRegistry.name(actorName, QUEUE_SIZE);
+        String fullName = MetricRegistry.name(actorName, metricName);
+
+        MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
 
 
-        if (registry.getMetrics().containsKey(metricName)) {
-            return; //already registered
+        if (registry.getMetrics().containsKey(fullName)) {
+            // already registered
+            return;
         }
 
         }
 
-        Gauge<Integer> queueSize = getQueueSizeGuage(monitoredQueue);
-        registerQueueSizeMetric(metricName, queueSize);
+        try {
+            registry.register(fullName, metric);
+        } catch (IllegalArgumentException e) {
+            // already registered - shouldn't happen here since we check above...
+            LOG.debug("Unable to register '{}' in metrics registry: {}", e);
+        }
     }
 
     }
 
-
     public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
         private static final long serialVersionUID = 1L;
 
     public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
         private static final long serialVersionUID = 1L;
 
@@ -71,17 +84,5 @@ public class MeteredBoundedMailbox implements MailboxType,
             super(capacity, pushTimeOut);
         }
     }
             super(capacity, pushTimeOut);
         }
     }
-
-    private static Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) {
-        return monitoredQueue::size;
-    }
-
-    private void registerQueueSizeMetric(String metricName, Gauge<Integer> metric) {
-        try {
-            registry.register(metricName,metric);
-        } catch (IllegalArgumentException e) {
-            LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);
-        }
-    }
 }
 
 }
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnboundedDequeBasedControlAwareMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnboundedDequeBasedControlAwareMailbox.java
new file mode 100644 (file)
index 0000000..46c42ed
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.ControlMessage;
+import akka.dispatch.DequeBasedMessageQueueSemantics;
+import akka.dispatch.Envelope;
+import akka.dispatch.MailboxType;
+import akka.dispatch.ProducesMessageQueue;
+import akka.dispatch.UnboundedControlAwareMailbox;
+import com.codahale.metrics.Gauge;
+import com.typesafe.config.Config;
+import java.util.Deque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+/**
+ * An unbounded ControlAwareMailbox that also supports DequeBasedMessageQueueSemantics so it can be used with
+ * persistent actors which use stashing.
+ *
+ * @author Thomas Pantelis
+ */
+public class UnboundedDequeBasedControlAwareMailbox implements MailboxType,
+        ProducesMessageQueue<UnboundedDequeBasedControlAwareMailbox.MessageQueue> {
+    private static final Logger LOG = LoggerFactory.getLogger(UnboundedDequeBasedControlAwareMailbox.class);
+    private static final String NORMAL_QUEUE_SIZE = "normal-q-size";
+    private static final String CONTROL_QUEUE_SIZE = "control-q-size";
+    private static final String TOTAL_QUEUE_SIZE = "total-q-size";
+
+    public UnboundedDequeBasedControlAwareMailbox(ActorSystem.Settings settings, Config config) {
+    }
+
+    @Override
+    public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
+        LOG.debug("Creating MessageQueue for {}", owner);
+
+        final MessageQueue queue = new MessageQueue();
+
+        MeteredBoundedMailbox.registerMetric(owner, NORMAL_QUEUE_SIZE, (Gauge<Integer>) () -> queue.queue().size());
+        MeteredBoundedMailbox.registerMetric(owner, CONTROL_QUEUE_SIZE,
+            (Gauge<Integer>) () -> queue.controlQueue().size());
+        MeteredBoundedMailbox.registerMetric(owner, TOTAL_QUEUE_SIZE, (Gauge<Integer>) queue::numberOfMessages);
+
+        return queue;
+    }
+
+    static class MessageQueue extends UnboundedControlAwareMailbox.MessageQueue
+            implements DequeBasedMessageQueueSemantics {
+        private static final long serialVersionUID = 1L;
+
+        private final Deque<Envelope> controlQueue = new ConcurrentLinkedDeque<>();
+        private final Deque<Envelope> queue = new ConcurrentLinkedDeque<>();
+
+        @Override
+        public Queue<Envelope> controlQueue() {
+            return controlQueue;
+        }
+
+        @Override
+        public Queue<Envelope> queue() {
+            return queue;
+        }
+
+        @Override
+        public void enqueueFirst(ActorRef actor, Envelope envelope) {
+            final Object message = envelope.message();
+            LOG.debug("enqueueFirst: actor {}, message type: {}", actor, message.getClass());
+            if (message instanceof ControlMessage) {
+                LOG.debug("Adding {} to the ControlMessage queue", message.getClass());
+                controlQueue.addFirst(envelope);
+            } else {
+                LOG.debug("Adding {} to the normal queue", message.getClass());
+                queue.addFirst(envelope);
+            }
+        }
+    }
+}