X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=7138a4a6e7b3bc05e610ad323c96ea13eae10281;hp=c9fdf389311f73c70ca2e0f16dae8e86b7cc0a05;hb=2a01b2263488748bd07d224a01b23f5550274447;hpb=52618cba48189dc021cb9d440645a34c772ee007 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index c9fdf38931..7138a4a6e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -18,9 +18,13 @@ import akka.actor.PoisonPill; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -54,11 +58,11 @@ import scala.concurrent.duration.FiniteDuration; * but should not be passed to actors especially remote actors */ public class ActorContext { - private static final Logger - LOG = LoggerFactory.getLogger(ActorContext.class); - - public static final String MAILBOX = "bounded-mailbox"; - + private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); + private static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; + private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; + private static final String METRIC_RATE = "rate"; + private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore"; private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper() { @Override @@ -74,36 +78,47 @@ public class ActorContext { return actualFailure; } }; + public static final String MAILBOX = "bounded-mailbox"; private final ActorSystem actorSystem; private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; private final DatastoreContext datastoreContext; - private volatile SchemaContext schemaContext; + private final String dataStoreType; private final FiniteDuration operationDuration; private final Timeout operationTimeout; private final String selfAddressHostPort; + private final RateLimiter txRateLimiter; + private final MetricRegistry metricRegistry = new MetricRegistry(); + private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; + private final Timeout transactionCommitOperationTimeout; + + private volatile SchemaContext schemaContext; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { this(actorSystem, shardManager, clusterWrapper, configuration, - DatastoreContext.newBuilder().build()); + DatastoreContext.newBuilder().build(), UNKNOWN_DATA_STORE_TYPE); } public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, String dataStoreType) { this.actorSystem = actorSystem; this.shardManager = shardManager; this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.dataStoreType = dataStoreType; + this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), - TimeUnit.SECONDS); + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); + transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), + TimeUnit.SECONDS)); + Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -113,6 +128,7 @@ public class ActorContext { } transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); + jmxReporter.start(); } public DatastoreContext getDatastoreContext() { @@ -446,4 +462,59 @@ public class ActorContext { public int getTransactionOutstandingOperationLimit(){ return transactionOutstandingOperationLimit; } + + /** + * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow + * us to create a timer for pretty much anything. + * + * @param operationName + * @return + */ + public Timer getOperationTimer(String operationName){ + final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName, METRIC_RATE); + return metricRegistry.timer(rate); + } + + /** + * Get the type of the data store to which this ActorContext belongs + * + * @return + */ + public String getDataStoreType() { + return dataStoreType; + } + + /** + * Set the number of transaction creation permits that are to be allowed + * + * @param permitsPerSecond + */ + public void setTxCreationLimit(double permitsPerSecond){ + txRateLimiter.setRate(permitsPerSecond); + } + + /** + * Get the current transaction creation rate limit + * @return + */ + public double getTxCreationLimit(){ + return txRateLimiter.getRate(); + } + + /** + * Try to acquire a transaction creation permit. Will block if no permits are available. + */ + public void acquireTxCreationPermit(){ + txRateLimiter.acquire(); + } + + /** + * Return the operation timeout to be used when committing transactions + * @return + */ + public Timeout getTransactionCommitOperationTimeout(){ + return transactionCommitOperationTimeout; + } + + }