X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=26e6318f6d4d14aa5944a909696bbaa8b5f7f207;hp=cb06c898fd1940743c19b1763f2b173ec3dacbfc;hb=707e2aa73c7314180472539ed4137950d33f5776;hpb=c64ef5f44f131976c20fcf8ced56627f81091838 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index cb06c898fd..26e6318f6d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -47,6 +47,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -93,6 +94,7 @@ public class ActorContext { private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; private final Timeout transactionCommitOperationTimeout; + private final Dispatchers dispatchers; private volatile SchemaContext schemaContext; @@ -111,6 +113,7 @@ public class ActorContext { this.configuration = configuration; this.datastoreContext = datastoreContext; this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); + this.dispatchers = new Dispatchers(actorSystem.dispatchers()); operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); @@ -127,6 +130,7 @@ public class ActorContext { transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); jmxReporter.start(); + } public DatastoreContext getDatastoreContext() { @@ -200,7 +204,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindPrimary returned unkown response: %s", response)); } - }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher()); + }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } /** @@ -251,7 +255,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindLocalShard returned unkown response: %s", response)); } - }, getActorSystem().dispatcher()); + }, getClientDispatcher()); } private String findPrimaryPathOrNull(String shardName) { @@ -514,5 +518,17 @@ public class ActorContext { return transactionCommitOperationTimeout; } + /** + * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client + * code on the datastore + * @return + */ + public ExecutionContext getClientDispatcher() { + return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client); + } + + public String getNotificationDispatcherPath(){ + return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); + } }