From: Moiz Raja Date: Tue, 19 Aug 2014 16:54:17 +0000 (+0000) Subject: Merge "Custom mailbox that is bounded and instrumented." X-Git-Tag: release/helium~265 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d206d27042eef2185c875f85cf6eac61a1bd77c4;hp=341ef75cc16912e5b51a260ce1a0a57076d4da58 Merge "Custom mailbox that is bounded and instrumented." --- diff --git a/opendaylight/md-sal/sal-clustering-commons/pom.xml b/opendaylight/md-sal/sal-clustering-commons/pom.xml index 6d6e440579..4419d19f52 100644 --- a/opendaylight/md-sal/sal-clustering-commons/pom.xml +++ b/opendaylight/md-sal/sal-clustering-commons/pom.xml @@ -160,6 +160,12 @@ jsr305 2.0.1 + + + com.codahale.metrics + metrics-core + 3.0.1 + diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java new file mode 100644 index 0000000000..646431522e --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.common.actor; + +import akka.actor.ActorPath; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.BoundedMailbox; +import akka.dispatch.MailboxType; +import akka.dispatch.MessageQueue; +import akka.dispatch.ProducesMessageQueue; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import org.opendaylight.controller.common.reporting.MetricsReporter; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue { + + private MeteredMessageQueue queue; + private Integer capacity; + private FiniteDuration pushTimeOut; + private ActorPath actorPath; + private MetricsReporter reporter; + + private final String QUEUE_SIZE = "queue-size"; + private final Long DEFAULT_TIMEOUT = 10L; + + public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) { + Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" ); + this.capacity = config.getInt("mailbox-capacity"); + Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0"); + + Long timeout = -1L; + if ( config.hasPath("mailbox-push-timeout-time") ){ + timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS); + } else { + timeout = DEFAULT_TIMEOUT; + } + Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0"); + this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS); + + reporter = MetricsReporter.getInstance(); + } + + + @Override + public MessageQueue create(final scala.Option owner, scala.Option system) { + this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut); + monitorQueueSize(owner, this.queue); + return this.queue; + } + + private void monitorQueueSize(scala.Option owner, final MeteredMessageQueue monitoredQueue) { + if (owner.isEmpty()) { + return; //there's no actor to monitor + } + actorPath = owner.get().path(); + MetricRegistry registry = reporter.getMetricsRegistry(); + + String actorName = registry.name(actorPath.toString(), QUEUE_SIZE); + + if (registry.getMetrics().containsKey(actorName)) + return; //already registered + + reporter.getMetricsRegistry().register(actorName, + new Gauge() { + @Override + public Integer getValue() { + return monitoredQueue.size(); + } + }); + } + + + public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue { + + public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) { + super(capacity, pushTimeOut); + } + } + +} + diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java new file mode 100644 index 0000000000..5c3e11f8b8 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.common.reporting; + +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; + +/** + * Maintains metrics registry that is provided to reporters. + * At the moment only one reporter exists {@code JmxReporter}. + * More reporters can be added. + *

+ * The consumers of this class will only be interested in {@code MetricsRegistry} + * where metrics for that consumer gets stored. + */ +public class MetricsReporter implements AutoCloseable{ + + private final MetricRegistry METRICS_REGISTRY = new MetricRegistry(); + private final String DOMAIN = "org.opendaylight.controller"; + + public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build(); + + private static MetricsReporter inst = new MetricsReporter(); + + private MetricsReporter(){ + jmxReporter.start(); + } + + public static MetricsReporter getInstance(){ + return inst; + } + + public MetricRegistry getMetricsRegistry(){ + return METRICS_REGISTRY; + } + + @Override + public void close() throws Exception { + jmxReporter.close(); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java new file mode 100644 index 0000000000..bfdb0930b1 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.common.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.DeadLetter; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.japi.Creator; +import akka.testkit.JavaTestKit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class MeteredBoundedMailboxTest { + + private static ActorSystem actorSystem; + private final ReentrantLock lock = new ReentrantLock(); + + @Before + public void setUp() throws Exception { + actorSystem = ActorSystem.create("testsystem"); + } + + @After + public void tearDown() throws Exception { + if (actorSystem != null) + actorSystem.shutdown(); + } + + @Test + public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException { + final JavaTestKit mockReceiver = new JavaTestKit(actorSystem); + actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class); + + + final FiniteDuration ONE_SEC = new FiniteDuration(1, TimeUnit.SECONDS); + String boundedMailBox = actorSystem.name() + ".bounded-mailbox"; + ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox), + "pingpongactor"); + + actorSystem.mailboxes().settings(); + lock.lock(); + //queue capacity = 10 + //need to send 12 messages; 1 message is dequeued and actor waits on lock, + //2nd to 11th messages are put on the queue + //12th message is sent to dead letter. + for (int i=0;i<12;i++){ + pingPongActor.tell("ping", mockReceiver.getRef()); + } + + mockReceiver.expectMsgClass(ONE_SEC, DeadLetter.class); + + lock.unlock(); + + Object[] eleven = mockReceiver.receiveN(11, ONE_SEC); + } + + /** + * For testing + */ + public static class PingPongActor extends UntypedActor{ + + ReentrantLock lock; + + private PingPongActor(ReentrantLock lock){ + this.lock = lock; + } + + public static Props props(final ReentrantLock lock){ + return Props.create(new Creator(){ + @Override + public PingPongActor create() throws Exception { + return new PingPongActor(lock); + } + }); + } + + @Override + public void onReceive(Object message) throws Exception { + lock.lock(); + if ("ping".equals(message)) + getSender().tell("pong", getSelf()); + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf index e69de29bb2..0392dec3dd 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf @@ -0,0 +1,8 @@ +testsystem { + + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 10 + mailbox-push-timeout-time = 100ms + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf new file mode 100644 index 0000000000..3481bae8ae --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf @@ -0,0 +1,8 @@ +testsystem { + + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10ms + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index 08262c1088..9c5129d6a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -135,6 +135,11 @@ 1.1-SNAPSHOT + + com.codahale.metrics + metrics-core + 3.0.1 + junit @@ -172,6 +177,7 @@ sal-clustering-commons; sal-akka-raft; + *metrics*; !sal*; !*config-api*; !*testkit*;