Changed key actors to use bounded mailbox
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / common / actor / MeteredBoundedMailbox.java
index 646431522e3c9d4ee3b7b3703839505b38f39bfd..c6d3625ac3ca4ba2cb5e093a64a8681e5c1b4b81 100644 (file)
@@ -11,9 +11,8 @@ package org.opendaylight.controller.common.actor;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.BoundedMailbox;
+import akka.dispatch.BoundedDequeBasedMailbox;
 import akka.dispatch.MailboxType;
-import akka.dispatch.MessageQueue;
 import akka.dispatch.ProducesMessageQueue;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
@@ -24,7 +23,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
 
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<BoundedMailbox.MessageQueue> {
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
 
     private MeteredMessageQueue queue;
     private Integer capacity;
@@ -33,16 +32,18 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
     private MetricsReporter reporter;
 
     private final String QUEUE_SIZE = "queue-size";
+    private final String CAPACITY = "mailbox-capacity";
+    private final String TIMEOUT  = "mailbox-push-timeout-time";
     private final Long DEFAULT_TIMEOUT = 10L;
 
     public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
         Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
-        this.capacity = config.getInt("mailbox-capacity");
+        this.capacity = config.getInt(CAPACITY);
         Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
 
         Long timeout = -1L;
-        if ( config.hasPath("mailbox-push-timeout-time") ){
-            timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS);
+        if ( config.hasPath(TIMEOUT) ){
+            timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS);
         } else {
             timeout = DEFAULT_TIMEOUT;
         }
@@ -54,7 +55,7 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
 
 
     @Override
-    public MessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+    public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
         this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
         monitorQueueSize(owner, this.queue);
         return this.queue;
@@ -65,14 +66,15 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
             return; //there's no actor to monitor
         }
         actorPath = owner.get().path();
-        MetricRegistry registry = reporter.getMetricsRegistry();
+        String actorInstanceId = Integer.toString(owner.get().hashCode());
 
-        String actorName = registry.name(actorPath.toString(), QUEUE_SIZE);
+        MetricRegistry registry = reporter.getMetricsRegistry();
+        String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE);
 
         if (registry.getMetrics().containsKey(actorName))
             return; //already registered
 
-        reporter.getMetricsRegistry().register(actorName,
+        registry.register(actorName,
                 new Gauge<Integer>() {
                     @Override
                     public Integer getValue() {
@@ -82,7 +84,7 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
     }
 
 
-    public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue {
+    public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
 
         public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
             super(capacity, pushTimeOut);