X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=0fb09d8231903bbc9b530f488039adc6b8672b90;hb=b03c5a20673792f0fb8df847fbaf9c359c7cce1b;hp=7eede29b65690db530fd4b9cfb9acb130365fcb6;hpb=7f44a5812395762479de68d339c3d017db6c0e9d;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 7eede29b65..0fb09d8231 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -15,15 +15,19 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.PoisonPill; +import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; @@ -95,6 +99,7 @@ public class ActorContext { private final int transactionOutstandingOperationLimit; private Timeout transactionCommitOperationTimeout; private final Dispatchers dispatchers; + private final Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -116,6 +121,14 @@ public class ActorContext { this.dispatchers = new Dispatchers(actorSystem.dispatchers()); setCachedProperties(); + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); + + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); + operationTimeout = new Timeout(operationDuration); + transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), + TimeUnit.SECONDS)); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -204,6 +217,10 @@ public class ActorContext { } public Future findPrimaryShardAsync(final String shardName) { + Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); + if(ret != null){ + return ret; + } Future future = executeOperationAsync(shardManager, new FindPrimary(shardName, true).toSerializable(), datastoreContext.getShardInitializationTimeout()); @@ -211,11 +228,13 @@ public class ActorContext { return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { + if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { PrimaryFound found = PrimaryFound.fromSerializable(response); LOG.debug("Primary found {}", found.getPrimaryPath()); - return actorSystem.actorSelection(found.getPrimaryPath()); + ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); + primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); + return actorSelection; } else if(response instanceof ActorNotInitialized) { throw new NotInitializedException( String.format("Found primary shard %s but it's not initialized yet. " + @@ -325,7 +344,7 @@ public class ActorContext { Preconditions.checkArgument(message != null, "message must not be null"); LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -361,7 +380,7 @@ public class ActorContext { LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -555,4 +574,16 @@ public class ActorContext { return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); } + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + protected Future doAsk(ActorSelection actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + @VisibleForTesting + Cache> getPrimaryShardActorSelectionCache() { + return primaryShardActorSelectionCache; + } }