package org.opendaylight.controller.cluster.datastore.utils;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.dispatch.Mapper;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import static akka.pattern.Patterns.ask;
-
/**
* The ActorContext class contains utility methods which could be used by
* non-actors (like DistributedDataStore) to work with actors a little more
* but should not be passed to actors especially remote actors
*/
public class ActorContext {
- private static final Logger
- LOG = LoggerFactory.getLogger(ActorContext.class);
-
- public static final String MAILBOX = "bounded-mailbox";
-
+ private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+ private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+ private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+ private static final String METRIC_RATE = "rate";
+ private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
return actualFailure;
}
};
+ public static final String MAILBOX = "bounded-mailbox";
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private final DatastoreContext datastoreContext;
- private volatile SchemaContext schemaContext;
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
+ private final RateLimiter txRateLimiter;
+ private final MetricRegistry metricRegistry = new MetricRegistry();
+ private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
+ private final int transactionOutstandingOperationLimit;
+ private final Timeout transactionCommitOperationTimeout;
+ private final Dispatchers dispatchers;
+
+ private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
+ this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+ this.dispatchers = new Dispatchers(actorSystem.dispatchers());
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
- TimeUnit.SECONDS);
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
+ transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+ TimeUnit.SECONDS));
+
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
} else {
selfAddressHostPort = null;
}
+
+ transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+ jmxReporter.start();
+
}
public DatastoreContext getDatastoreContext() {
throw new UnknownMessageException(String.format(
"FindPrimary returned unkown response: %s", response));
}
- }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
}
/**
throw new UnknownMessageException(String.format(
"FindLocalShard returned unkown response: %s", response));
}
- }, getActorSystem().dispatcher());
+ }, getClientDispatcher());
}
private String findPrimaryPathOrNull(String shardName) {
return builder.toString();
}
+
+ /**
+ * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+ * should begin throttling the operations
+ *
+ * Parking reading this configuration here because we need to get to the actor system settings
+ *
+ * @return
+ */
+ public int getTransactionOutstandingOperationLimit(){
+ return transactionOutstandingOperationLimit;
+ }
+
+ /**
+ * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+ * us to create a timer for pretty much anything.
+ *
+ * @param operationName
+ * @return
+ */
+ public Timer getOperationTimer(String operationName){
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+ return metricRegistry.timer(rate);
+ }
+
+ /**
+ * Get the type of the data store to which this ActorContext belongs
+ *
+ * @return
+ */
+ public String getDataStoreType() {
+ return datastoreContext.getDataStoreType();
+ }
+
+ /**
+ * Set the number of transaction creation permits that are to be allowed
+ *
+ * @param permitsPerSecond
+ */
+ public void setTxCreationLimit(double permitsPerSecond){
+ txRateLimiter.setRate(permitsPerSecond);
+ }
+
+ /**
+ * Get the current transaction creation rate limit
+ * @return
+ */
+ public double getTxCreationLimit(){
+ return txRateLimiter.getRate();
+ }
+
+ /**
+ * Try to acquire a transaction creation permit. Will block if no permits are available.
+ */
+ public void acquireTxCreationPermit(){
+ txRateLimiter.acquire();
+ }
+
+ /**
+ * Return the operation timeout to be used when committing transactions
+ * @return
+ */
+ public Timeout getTransactionCommitOperationTimeout(){
+ return transactionCommitOperationTimeout;
+ }
+
+ /**
+ * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
+ * code on the datastore
+ * @return
+ */
+ public ExecutionContext getClientDispatcher() {
+ return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+ }
+
+ public String getNotificationDispatcherPath(){
+ return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
+ }
+
}