Custom mailbox that is bounded and instrumented. 73/9973/6
authorAbhishek Kumar <abhishk2@cisco.com>
Fri, 15 Aug 2014 06:15:28 +0000 (23:15 -0700)
committerAbhishek Kumar <abhishk2@cisco.com>
Tue, 19 Aug 2014 01:11:57 +0000 (18:11 -0700)
This is for use with Akka actors. It enables monitoring
of actor's mailbox size via JMX.

Change-Id: Ic1e478e4411f53ff0239d316a1ca02eaa80360ed
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf
opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/pom.xml

index 6db4d3a094f43cb80c461418cfd56d54639dd5a2..a904d6a3ba361788883bebf5ad25ef48199f35d9 100644 (file)
           <artifactId>jsr305</artifactId>
           <version>2.0.1</version>
       </dependency>
           <artifactId>jsr305</artifactId>
           <version>2.0.1</version>
       </dependency>
+
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-core</artifactId>
+          <version>3.0.1</version>
+      </dependency>
   </dependencies>
 
 </project>
   </dependencies>
 
 </project>
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java
new file mode 100644 (file)
index 0000000..6464315
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.common.actor;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.BoundedMailbox;
+import akka.dispatch.MailboxType;
+import akka.dispatch.MessageQueue;
+import akka.dispatch.ProducesMessageQueue;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.common.reporting.MetricsReporter;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<BoundedMailbox.MessageQueue> {
+
+    private MeteredMessageQueue queue;
+    private Integer capacity;
+    private FiniteDuration pushTimeOut;
+    private ActorPath actorPath;
+    private MetricsReporter reporter;
+
+    private final String QUEUE_SIZE = "queue-size";
+    private final Long DEFAULT_TIMEOUT = 10L;
+
+    public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
+        Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
+        this.capacity = config.getInt("mailbox-capacity");
+        Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
+
+        Long timeout = -1L;
+        if ( config.hasPath("mailbox-push-timeout-time") ){
+            timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS);
+        } else {
+            timeout = DEFAULT_TIMEOUT;
+        }
+        Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0");
+        this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
+
+        reporter = MetricsReporter.getInstance();
+    }
+
+
+    @Override
+    public MessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+        this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+        monitorQueueSize(owner, this.queue);
+        return this.queue;
+    }
+
+    private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+        if (owner.isEmpty()) {
+            return; //there's no actor to monitor
+        }
+        actorPath = owner.get().path();
+        MetricRegistry registry = reporter.getMetricsRegistry();
+
+        String actorName = registry.name(actorPath.toString(), QUEUE_SIZE);
+
+        if (registry.getMetrics().containsKey(actorName))
+            return; //already registered
+
+        reporter.getMetricsRegistry().register(actorName,
+                new Gauge<Integer>() {
+                    @Override
+                    public Integer getValue() {
+                        return monitoredQueue.size();
+                    }
+                });
+    }
+
+
+    public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue {
+
+        public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
+            super(capacity, pushTimeOut);
+        }
+    }
+
+}
+
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java
new file mode 100644 (file)
index 0000000..5c3e11f
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.common.reporting;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Maintains metrics registry that is provided to reporters.
+ * At the moment only one reporter exists {@code JmxReporter}.
+ * More reporters can be added.
+ * <p/>
+ * The consumers of this class will only be interested in {@code MetricsRegistry}
+ * where metrics for that consumer gets stored.
+ */
+public class MetricsReporter implements AutoCloseable{
+
+    private final MetricRegistry METRICS_REGISTRY = new MetricRegistry();
+    private final String DOMAIN = "org.opendaylight.controller";
+
+    public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build();
+
+    private static MetricsReporter inst = new MetricsReporter();
+
+    private MetricsReporter(){
+        jmxReporter.start();
+    }
+
+    public static MetricsReporter getInstance(){
+        return inst;
+    }
+
+    public MetricRegistry getMetricsRegistry(){
+        return METRICS_REGISTRY;
+    }
+
+    @Override
+    public void close() throws Exception {
+        jmxReporter.close();
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java
new file mode 100644 (file)
index 0000000..bfdb093
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.DeadLetter;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import akka.testkit.JavaTestKit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MeteredBoundedMailboxTest {
+
+    private static ActorSystem actorSystem;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    @Before
+    public void setUp() throws Exception {
+        actorSystem = ActorSystem.create("testsystem");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+       if (actorSystem != null)
+           actorSystem.shutdown();
+    }
+
+    @Test
+    public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException {
+        final JavaTestKit mockReceiver = new JavaTestKit(actorSystem);
+        actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class);
+
+
+        final FiniteDuration ONE_SEC = new FiniteDuration(1, TimeUnit.SECONDS);
+        String boundedMailBox = actorSystem.name() + ".bounded-mailbox";
+        ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox),
+                                                     "pingpongactor");
+
+        actorSystem.mailboxes().settings();
+        lock.lock();
+        //queue capacity = 10
+        //need to send 12 messages; 1 message is dequeued and actor waits on lock,
+        //2nd to 11th messages are put on the queue
+        //12th message is sent to dead letter.
+        for (int i=0;i<12;i++){
+            pingPongActor.tell("ping", mockReceiver.getRef());
+        }
+
+        mockReceiver.expectMsgClass(ONE_SEC, DeadLetter.class);
+
+        lock.unlock();
+
+        Object[] eleven = mockReceiver.receiveN(11, ONE_SEC);
+    }
+
+    /**
+     * For testing
+     */
+    public static class PingPongActor extends UntypedActor{
+
+        ReentrantLock lock;
+
+        private PingPongActor(ReentrantLock lock){
+            this.lock = lock;
+        }
+
+        public static Props props(final ReentrantLock lock){
+            return Props.create(new Creator<PingPongActor>(){
+                @Override
+                public PingPongActor create() throws Exception {
+                    return new PingPongActor(lock);
+                }
+            });
+        }
+
+        @Override
+        public void onReceive(Object message) throws Exception {
+            lock.lock();
+            if ("ping".equals(message))
+                getSender().tell("pong", getSelf());
+        }
+    }
+}
\ No newline at end of file
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0392dec3dd43075c2df343526bafa5cbd59e4ada 100644 (file)
@@ -0,0 +1,8 @@
+testsystem {
+
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 10
+    mailbox-push-timeout-time = 100ms
+  }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf
new file mode 100644 (file)
index 0000000..3481bae
--- /dev/null
@@ -0,0 +1,8 @@
+testsystem {
+
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 10ms
+  }
+}
\ No newline at end of file
index 648e8d23d06892f8964ed65e322b7cb988ee8194..b47222ffa26e65ceb4599f2c2319a9491d2599f6 100644 (file)
         <version>1.1-SNAPSHOT</version>
     </dependency>
 
         <version>1.1-SNAPSHOT</version>
     </dependency>
 
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-core</artifactId>
+          <version>3.0.1</version>
+      </dependency>
     <!-- Test Dependencies -->
     <dependency>
       <groupId>junit</groupId>
     <!-- Test Dependencies -->
     <dependency>
       <groupId>junit</groupId>
             <Embed-Dependency>
                 sal-clustering-commons;
                 sal-akka-raft;
             <Embed-Dependency>
                 sal-clustering-commons;
                 sal-akka-raft;
+                *metrics*;
                 !sal*;
                 !*config-api*;
                 !*testkit*;
                 !sal*;
                 !*config-api*;
                 !*testkit*;