BUG 2676 : Use notification-dispatcher for DataChangeListener actors
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index f81c2a87cd8694dab4c5c194a3a3be05af6bc282..26e6318f6d4d14aa5944a909696bbaa8b5f7f207 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -17,10 +18,15 @@ import akka.actor.PoisonPill;
 import akka.dispatch.Mapper;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -41,12 +47,11 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import static akka.pattern.Patterns.ask;
-
 /**
  * The ActorContext class contains utility methods which could be used by
  * non-actors (like DistributedDataStore) to work with actors a little more
@@ -54,11 +59,11 @@ import static akka.pattern.Patterns.ask;
  * but should not be passed to actors especially remote actors
  */
 public class ActorContext {
-    private static final Logger
-        LOG = LoggerFactory.getLogger(ActorContext.class);
-
-    public static final String MAILBOX = "bounded-mailbox";
-
+    private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+    private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+    private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+    private static final String METRIC_RATE = "rate";
+    private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -74,16 +79,24 @@ public class ActorContext {
             return actualFailure;
         }
     };
+    public static final String MAILBOX = "bounded-mailbox";
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
     private final DatastoreContext datastoreContext;
-    private volatile SchemaContext schemaContext;
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
     private final String selfAddressHostPort;
+    private final RateLimiter txRateLimiter;
+    private final MetricRegistry metricRegistry = new MetricRegistry();
+    private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
+    private final int transactionOutstandingOperationLimit;
+    private final Timeout transactionCommitOperationTimeout;
+    private final Dispatchers dispatchers;
+
+    private volatile SchemaContext schemaContext;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -99,10 +112,14 @@ public class ActorContext {
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
+        this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+        this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
-                TimeUnit.SECONDS);
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+                TimeUnit.SECONDS));
+
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -110,6 +127,10 @@ public class ActorContext {
         } else {
             selfAddressHostPort = null;
         }
+
+        transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+        jmxReporter.start();
+
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -183,7 +204,7 @@ public class ActorContext {
                 throw new UnknownMessageException(String.format(
                         "FindPrimary returned unkown response: %s", response));
             }
-        }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+        }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
     /**
@@ -234,7 +255,7 @@ public class ActorContext {
                 throw new UnknownMessageException(String.format(
                         "FindLocalShard returned unkown response: %s", response));
             }
-        }, getActorSystem().dispatcher());
+        }, getClientDispatcher());
     }
 
     private String findPrimaryPathOrNull(String shardName) {
@@ -279,7 +300,7 @@ public class ActorContext {
         Preconditions.checkArgument(actor != null, "actor must not be null");
         Preconditions.checkArgument(message != null, "message must not be null");
 
-        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+        LOG.debug("Sending message {} to {}", message.getClass(), actor);
         return ask(actor, message, timeout);
     }
 
@@ -314,7 +335,7 @@ public class ActorContext {
         Preconditions.checkArgument(actor != null, "actor must not be null");
         Preconditions.checkArgument(message != null, "message must not be null");
 
-        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+        LOG.debug("Sending message {} to {}", message.getClass(), actor);
 
         return ask(actor, message, timeout);
     }
@@ -341,7 +362,7 @@ public class ActorContext {
         Preconditions.checkArgument(actor != null, "actor must not be null");
         Preconditions.checkArgument(message != null, "message must not be null");
 
-        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+        LOG.debug("Sending message {} to {}", message.getClass(), actor);
 
         actor.tell(message, ActorRef.noSender());
     }
@@ -386,14 +407,14 @@ public class ActorContext {
             return false;
         }
 
-        int pathAtIndex = path.indexOf("@");
+        int pathAtIndex = path.indexOf('@');
         if (pathAtIndex == -1) {
             //if the path is of local format, then its local and is co-located
             return true;
 
         } else if (selfAddressHostPort != null) {
             // self-address and tx actor path, both are of remote path format
-            int slashIndex = path.indexOf("/", pathAtIndex);
+            int slashIndex = path.indexOf('/', pathAtIndex);
 
             if (slashIndex == -1) {
                 return false;
@@ -431,4 +452,83 @@ public class ActorContext {
 
         return builder.toString();
     }
+
+    /**
+     * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+     * should begin throttling the operations
+     *
+     * Parking reading this configuration here because we need to get to the actor system settings
+     *
+     * @return
+     */
+    public int getTransactionOutstandingOperationLimit(){
+        return transactionOutstandingOperationLimit;
+    }
+
+    /**
+     * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+     * us to create a timer for pretty much anything.
+     *
+     * @param operationName
+     * @return
+     */
+    public Timer getOperationTimer(String operationName){
+        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+        return metricRegistry.timer(rate);
+    }
+
+    /**
+     * Get the type of the data store to which this ActorContext belongs
+     *
+     * @return
+     */
+    public String getDataStoreType() {
+        return datastoreContext.getDataStoreType();
+    }
+
+    /**
+     * Set the number of transaction creation permits that are to be allowed
+     *
+     * @param permitsPerSecond
+     */
+    public void setTxCreationLimit(double permitsPerSecond){
+        txRateLimiter.setRate(permitsPerSecond);
+    }
+
+    /**
+     * Get the current transaction creation rate limit
+     * @return
+     */
+    public double getTxCreationLimit(){
+        return txRateLimiter.getRate();
+    }
+
+    /**
+     * Try to acquire a transaction creation permit. Will block if no permits are available.
+     */
+    public void acquireTxCreationPermit(){
+        txRateLimiter.acquire();
+    }
+
+    /**
+     * Return the operation timeout to be used when committing transactions
+     * @return
+     */
+    public Timeout getTransactionCommitOperationTimeout(){
+        return transactionCommitOperationTimeout;
+    }
+
+    /**
+     * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
+     * code on the datastore
+     * @return
+     */
+    public ExecutionContext getClientDispatcher() {
+        return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+    }
+
+    public String getNotificationDispatcherPath(){
+        return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
+    }
+
 }