X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=26e6318f6d4d14aa5944a909696bbaa8b5f7f207;hp=904dcdf43989bebfeb180b5f56c489cbf035621a;hb=707e2aa73c7314180472539ed4137950d33f5776;hpb=f281dc3e886501dc617e397c8e203732300a8f08 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 904dcdf439..26e6318f6d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -8,16 +8,25 @@ package org.opendaylight.controller.cluster.datastore.utils; +import static akka.pattern.Patterns.ask; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.Address; import akka.actor.PoisonPill; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.RateLimiter; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -38,11 +47,10 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; -import static akka.pattern.Patterns.ask; /** * The ActorContext class contains utility methods which could be used by @@ -51,11 +59,11 @@ import static akka.pattern.Patterns.ask; * but should not be passed to actors especially remote actors */ public class ActorContext { - private static final Logger - LOG = LoggerFactory.getLogger(ActorContext.class); - - public static final String MAILBOX = "bounded-mailbox"; - + private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); + private static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; + private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; + private static final String METRIC_RATE = "rate"; + private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore"; private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper() { @Override @@ -71,15 +79,24 @@ public class ActorContext { return actualFailure; } }; + public static final String MAILBOX = "bounded-mailbox"; private final ActorSystem actorSystem; private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; private final DatastoreContext datastoreContext; - private volatile SchemaContext schemaContext; private final FiniteDuration operationDuration; private final Timeout operationTimeout; + private final String selfAddressHostPort; + private final RateLimiter txRateLimiter; + private final MetricRegistry metricRegistry = new MetricRegistry(); + private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); + private final int transactionOutstandingOperationLimit; + private final Timeout transactionCommitOperationTimeout; + private final Dispatchers dispatchers; + + private volatile SchemaContext schemaContext; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -95,10 +112,25 @@ public class ActorContext { this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); + this.dispatchers = new Dispatchers(actorSystem.dispatchers()); - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), - TimeUnit.SECONDS); + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); + transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), + TimeUnit.SECONDS)); + + + Address selfAddress = clusterWrapper.getSelfAddress(); + if (selfAddress != null && !selfAddress.host().isEmpty()) { + selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get(); + } else { + selfAddressHostPort = null; + } + + transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); + jmxReporter.start(); + } public DatastoreContext getDatastoreContext() { @@ -172,7 +204,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindPrimary returned unkown response: %s", response)); } - }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher()); + }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } /** @@ -223,7 +255,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindLocalShard returned unkown response: %s", response)); } - }, getActorSystem().dispatcher()); + }, getClientDispatcher()); } private String findPrimaryPathOrNull(String shardName) { @@ -268,7 +300,7 @@ public class ActorContext { Preconditions.checkArgument(actor != null, "actor must not be null"); Preconditions.checkArgument(message != null, "message must not be null"); - LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); + LOG.debug("Sending message {} to {}", message.getClass(), actor); return ask(actor, message, timeout); } @@ -303,7 +335,7 @@ public class ActorContext { Preconditions.checkArgument(actor != null, "actor must not be null"); Preconditions.checkArgument(message != null, "message must not be null"); - LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); + LOG.debug("Sending message {} to {}", message.getClass(), actor); return ask(actor, message, timeout); } @@ -330,7 +362,7 @@ public class ActorContext { Preconditions.checkArgument(actor != null, "actor must not be null"); Preconditions.checkArgument(message != null, "message must not be null"); - LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); + LOG.debug("Sending message {} to {}", message.getClass(), actor); actor.tell(message, ActorRef.noSender()); } @@ -370,30 +402,31 @@ public class ActorContext { return operationDuration; } - public boolean isLocalPath(String path) { - String selfAddress = clusterWrapper.getSelfAddress(); - if (path == null || selfAddress == null) { + public boolean isPathLocal(String path) { + if (Strings.isNullOrEmpty(path)) { return false; } - int atIndex1 = path.indexOf("@"); - int atIndex2 = selfAddress.indexOf("@"); + int pathAtIndex = path.indexOf('@'); + if (pathAtIndex == -1) { + //if the path is of local format, then its local and is co-located + return true; - if (atIndex1 == -1 || atIndex2 == -1) { - return false; - } + } else if (selfAddressHostPort != null) { + // self-address and tx actor path, both are of remote path format + int slashIndex = path.indexOf('/', pathAtIndex); + + if (slashIndex == -1) { + return false; + } - int slashIndex1 = path.indexOf("/", atIndex1); - int slashIndex2 = selfAddress.indexOf("/", atIndex2); + String hostPort = path.substring(pathAtIndex + 1, slashIndex); + return hostPort.equals(selfAddressHostPort); - if (slashIndex1 == -1 || slashIndex2 == -1) { + } else { + // self address is local format and tx actor path is remote format return false; } - - String hostPort1 = path.substring(atIndex1, slashIndex1); - String hostPort2 = selfAddress.substring(atIndex2, slashIndex2); - - return hostPort1.equals(hostPort2); } /** @@ -419,4 +452,83 @@ public class ActorContext { return builder.toString(); } + + /** + * Get the maximum number of operations that are to be permitted within a transaction before the transaction + * should begin throttling the operations + * + * Parking reading this configuration here because we need to get to the actor system settings + * + * @return + */ + public int getTransactionOutstandingOperationLimit(){ + return transactionOutstandingOperationLimit; + } + + /** + * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow + * us to create a timer for pretty much anything. + * + * @param operationName + * @return + */ + public Timer getOperationTimer(String operationName){ + final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE); + return metricRegistry.timer(rate); + } + + /** + * Get the type of the data store to which this ActorContext belongs + * + * @return + */ + public String getDataStoreType() { + return datastoreContext.getDataStoreType(); + } + + /** + * Set the number of transaction creation permits that are to be allowed + * + * @param permitsPerSecond + */ + public void setTxCreationLimit(double permitsPerSecond){ + txRateLimiter.setRate(permitsPerSecond); + } + + /** + * Get the current transaction creation rate limit + * @return + */ + public double getTxCreationLimit(){ + return txRateLimiter.getRate(); + } + + /** + * Try to acquire a transaction creation permit. Will block if no permits are available. + */ + public void acquireTxCreationPermit(){ + txRateLimiter.acquire(); + } + + /** + * Return the operation timeout to be used when committing transactions + * @return + */ + public Timeout getTransactionCommitOperationTimeout(){ + return transactionCommitOperationTimeout; + } + + /** + * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client + * code on the datastore + * @return + */ + public ExecutionContext getClientDispatcher() { + return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client); + } + + public String getNotificationDispatcherPath(){ + return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); + } + }