X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=6f9bb7fc9feb4ede3a00d06b07d689d615f51458;hb=468b9523001807db03b3a545328ac9bf819278c7;hp=7eede29b65690db530fd4b9cfb9acb130365fcb6;hpb=95771c01e72fca4414c3b4d75734126d7c53f6df;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 7eede29b65..6f9bb7fc9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -15,15 +15,20 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.PoisonPill; +import akka.dispatch.Futures; import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; @@ -31,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; @@ -94,7 +100,9 @@ public class ActorContext { private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; private Timeout transactionCommitOperationTimeout; + private Timeout shardInitializationTimeout; private final Dispatchers dispatchers; + private Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -137,6 +145,12 @@ public class ActorContext { transactionCommitOperationTimeout = new Timeout(Duration.create( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); + + shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); + + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); } public DatastoreContext getDatastoreContext() { @@ -189,33 +203,24 @@ public class ActorContext { return schemaContext; } - /** - * Finds the primary shard for the given shard name - * - * @param shardName - * @return - */ - public Optional findPrimaryShard(String shardName) { - String path = findPrimaryPathOrNull(shardName); - if (path == null){ - return Optional.absent(); - } - return Optional.of(actorSystem.actorSelection(path)); - } - public Future findPrimaryShardAsync(final String shardName) { + Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); + if(ret != null){ + return ret; + } Future future = executeOperationAsync(shardManager, - new FindPrimary(shardName, true).toSerializable(), - datastoreContext.getShardInitializationTimeout()); + new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout); return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { + if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { PrimaryFound found = PrimaryFound.fromSerializable(response); LOG.debug("Primary found {}", found.getPrimaryPath()); - return actorSystem.actorSelection(found.getPrimaryPath()); + ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); + primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); + return actorSelection; } else if(response instanceof ActorNotInitialized) { throw new NotInitializedException( String.format("Found primary shard %s but it's not initialized yet. " + @@ -223,6 +228,8 @@ public class ActorContext { } else if(response instanceof PrimaryNotFound) { throw new PrimaryNotFoundException( String.format("No primary shard found for %S.", shardName)); + } else if(response instanceof NoShardLeaderException) { + throw (NoShardLeaderException)response; } throw new UnknownMessageException(String.format( @@ -258,7 +265,7 @@ public class ActorContext { */ public Future findLocalShardAsync( final String shardName) { Future future = executeOperationAsync(shardManager, - new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout()); + new FindLocalShard(shardName, true), shardInitializationTimeout); return future.map(new Mapper() { @Override @@ -282,26 +289,6 @@ public class ActorContext { }, getClientDispatcher()); } - private String findPrimaryPathOrNull(String shardName) { - Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable()); - - if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { - PrimaryFound found = PrimaryFound.fromSerializable(result); - - LOG.debug("Primary found {}", found.getPrimaryPath()); - return found.getPrimaryPath(); - - } else if (result.getClass().equals(ActorNotInitialized.class)){ - throw new NotInitializedException( - String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName) - ); - - } else { - return null; - } - } - - /** * Executes an operation on a local actor and wait for it's response * @@ -325,7 +312,7 @@ public class ActorContext { Preconditions.checkArgument(message != null, "message must not be null"); LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -361,7 +348,7 @@ public class ActorContext { LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -409,16 +396,21 @@ public class ActorContext { * * @param message */ - public void broadcast(Object message){ - for(String shardName : configuration.getAllShardNames()){ - - Optional primary = findPrimaryShard(shardName); - if (primary.isPresent()) { - primary.get().tell(message, ActorRef.noSender()); - } else { - LOG.warn("broadcast failed to send message {} to shard {}. Primary not found", - message.getClass().getSimpleName(), shardName); - } + public void broadcast(final Object message){ + for(final String shardName : configuration.getAllShardNames()){ + + Future primaryFuture = findPrimaryShardAsync(shardName); + primaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, ActorSelection primaryShard) { + if(failure != null) { + LOG.warn("broadcast failed to send message {} to shard {}: {}", + message.getClass().getSimpleName(), shardName, failure); + } else { + primaryShard.tell(message, ActorRef.noSender()); + } + } + }, getClientDispatcher()); } } @@ -555,4 +547,16 @@ public class ActorContext { return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); } + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + protected Future doAsk(ActorSelection actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + @VisibleForTesting + Cache> getPrimaryShardActorSelectionCache() { + return primaryShardActorSelectionCache; + } }