X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=0fb09d8231903bbc9b530f488039adc6b8672b90;hb=51500b537a2d903acf2794091da8f79cbf082d50;hp=cb06c898fd1940743c19b1763f2b173ec3dacbfc;hpb=c68b9f38d84bd45c85b8133c2054ecdd4c413a8f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index cb06c898fd..0fb09d8231 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -15,15 +15,19 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.PoisonPill; +import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; @@ -47,6 +51,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -84,17 +89,20 @@ public class ActorContext { private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; - private final DatastoreContext datastoreContext; - private final FiniteDuration operationDuration; - private final Timeout operationTimeout; + private DatastoreContext datastoreContext; + private FiniteDuration operationDuration; + private Timeout operationTimeout; private final String selfAddressHostPort; - private final RateLimiter txRateLimiter; + private RateLimiter txRateLimiter; private final MetricRegistry metricRegistry = new MetricRegistry(); private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; - private final Timeout transactionCommitOperationTimeout; + private Timeout transactionCommitOperationTimeout; + private final Dispatchers dispatchers; + private final Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; + private volatile boolean updated; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -110,14 +118,18 @@ public class ActorContext { this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; - this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); + this.dispatchers = new Dispatchers(actorSystem.dispatchers()); + + setCachedProperties(); + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); - Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get(); @@ -127,6 +139,17 @@ public class ActorContext { transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); jmxReporter.start(); + + } + + private void setCachedProperties() { + txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); + + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); + operationTimeout = new Timeout(operationDuration); + + transactionCommitOperationTimeout = new Timeout(Duration.create( + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); } public DatastoreContext getDatastoreContext() { @@ -153,7 +176,25 @@ public class ActorContext { this.schemaContext = schemaContext; if(shardManager != null) { - shardManager.tell(new UpdateSchemaContext(schemaContext), null); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + } + } + + public void setDatastoreContext(DatastoreContext context) { + this.datastoreContext = context; + setCachedProperties(); + + // We write the 'updated' volatile to trigger a write memory barrier so that the writes above + // will be published immediately even though they may not be immediately visible to other + // threads due to unsynchronized reads. That's OK though - we're going for eventual + // consistency here as immediately visible updates to these members aren't critical. These + // members could've been made volatile but wanted to avoid volatile reads as these are + // accessed often and updates will be infrequent. + + updated = true; + + if(shardManager != null) { + shardManager.tell(context, ActorRef.noSender()); } } @@ -176,6 +217,10 @@ public class ActorContext { } public Future findPrimaryShardAsync(final String shardName) { + Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); + if(ret != null){ + return ret; + } Future future = executeOperationAsync(shardManager, new FindPrimary(shardName, true).toSerializable(), datastoreContext.getShardInitializationTimeout()); @@ -183,11 +228,13 @@ public class ActorContext { return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { + if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { PrimaryFound found = PrimaryFound.fromSerializable(response); LOG.debug("Primary found {}", found.getPrimaryPath()); - return actorSystem.actorSelection(found.getPrimaryPath()); + ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); + primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); + return actorSelection; } else if(response instanceof ActorNotInitialized) { throw new NotInitializedException( String.format("Found primary shard %s but it's not initialized yet. " + @@ -200,7 +247,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindPrimary returned unkown response: %s", response)); } - }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher()); + }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } /** @@ -251,7 +298,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindLocalShard returned unkown response: %s", response)); } - }, getActorSystem().dispatcher()); + }, getClientDispatcher()); } private String findPrimaryPathOrNull(String shardName) { @@ -297,7 +344,7 @@ public class ActorContext { Preconditions.checkArgument(message != null, "message must not be null"); LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -333,7 +380,7 @@ public class ActorContext { LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -514,5 +561,29 @@ public class ActorContext { return transactionCommitOperationTimeout; } + /** + * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client + * code on the datastore + * @return + */ + public ExecutionContext getClientDispatcher() { + return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client); + } + + public String getNotificationDispatcherPath(){ + return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); + } + + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + protected Future doAsk(ActorSelection actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + @VisibleForTesting + Cache> getPrimaryShardActorSelectionCache() { + return primaryShardActorSelectionCache; + } }