X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=b6250fc1cccbfbde152a8af7e1e2f4b9b3c6f7cd;hp=0fb09d8231903bbc9b530f488039adc6b8672b90;hb=c9d0ae2a8ce525d150f360cf49f21b7c0187cfbf;hpb=874a18a9ce5dc09bc49922754bf8fb3e981fffb9 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 0fb09d8231..b6250fc1cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -17,6 +17,7 @@ import akka.actor.Address; import akka.actor.PoisonPill; import akka.dispatch.Futures; import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; @@ -35,17 +36,16 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -98,8 +98,9 @@ public class ActorContext { private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; private Timeout transactionCommitOperationTimeout; + private Timeout shardInitializationTimeout; private final Dispatchers dispatchers; - private final Cache> primaryShardActorSelectionCache; + private Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -121,14 +122,6 @@ public class ActorContext { this.dispatchers = new Dispatchers(actorSystem.dispatchers()); setCachedProperties(); - primaryShardActorSelectionCache = CacheBuilder.newBuilder() - .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) - .build(); - - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); - operationTimeout = new Timeout(operationDuration); - transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), - TimeUnit.SECONDS)); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -150,6 +143,12 @@ public class ActorContext { transactionCommitOperationTimeout = new Timeout(Duration.create( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); + + shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); + + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); } public DatastoreContext getDatastoreContext() { @@ -202,46 +201,30 @@ public class ActorContext { return schemaContext; } - /** - * Finds the primary shard for the given shard name - * - * @param shardName - * @return - */ - public Optional findPrimaryShard(String shardName) { - String path = findPrimaryPathOrNull(shardName); - if (path == null){ - return Optional.absent(); - } - return Optional.of(actorSystem.actorSelection(path)); - } - public Future findPrimaryShardAsync(final String shardName) { Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); if(ret != null){ return ret; } Future future = executeOperationAsync(shardManager, - new FindPrimary(shardName, true).toSerializable(), - datastoreContext.getShardInitializationTimeout()); + new FindPrimary(shardName, true), shardInitializationTimeout); return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { - PrimaryFound found = PrimaryFound.fromSerializable(response); + if(response instanceof PrimaryFound) { + PrimaryFound found = (PrimaryFound)response; LOG.debug("Primary found {}", found.getPrimaryPath()); ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); return actorSelection; - } else if(response instanceof ActorNotInitialized) { - throw new NotInitializedException( - String.format("Found primary shard %s but it's not initialized yet. " + - "Please try again later", shardName)); - } else if(response instanceof PrimaryNotFound) { - throw new PrimaryNotFoundException( - String.format("No primary shard found for %S.", shardName)); + } else if(response instanceof NotInitializedException) { + throw (NotInitializedException)response; + } else if(response instanceof PrimaryNotFoundException) { + throw (PrimaryNotFoundException)response; + } else if(response instanceof NoShardLeaderException) { + throw (NoShardLeaderException)response; } throw new UnknownMessageException(String.format( @@ -277,7 +260,7 @@ public class ActorContext { */ public Future findLocalShardAsync( final String shardName) { Future future = executeOperationAsync(shardManager, - new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout()); + new FindLocalShard(shardName, true), shardInitializationTimeout); return future.map(new Mapper() { @Override @@ -286,10 +269,8 @@ public class ActorContext { LocalShardFound found = (LocalShardFound)response; LOG.debug("Local shard found {}", found.getPath()); return found.getPath(); - } else if(response instanceof ActorNotInitialized) { - throw new NotInitializedException( - String.format("Found local shard for %s but it's not initialized yet.", - shardName)); + } else if(response instanceof NotInitializedException) { + throw (NotInitializedException)response; } else if(response instanceof LocalShardNotFound) { throw new LocalShardNotFoundException( String.format("Local shard for %s does not exist.", shardName)); @@ -301,26 +282,6 @@ public class ActorContext { }, getClientDispatcher()); } - private String findPrimaryPathOrNull(String shardName) { - Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable()); - - if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { - PrimaryFound found = PrimaryFound.fromSerializable(result); - - LOG.debug("Primary found {}", found.getPrimaryPath()); - return found.getPrimaryPath(); - - } else if (result.getClass().equals(ActorNotInitialized.class)){ - throw new NotInitializedException( - String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName) - ); - - } else { - return null; - } - } - - /** * Executes an operation on a local actor and wait for it's response * @@ -428,16 +389,21 @@ public class ActorContext { * * @param message */ - public void broadcast(Object message){ - for(String shardName : configuration.getAllShardNames()){ - - Optional primary = findPrimaryShard(shardName); - if (primary.isPresent()) { - primary.get().tell(message, ActorRef.noSender()); - } else { - LOG.warn("broadcast failed to send message {} to shard {}. Primary not found", - message.getClass().getSimpleName(), shardName); - } + public void broadcast(final Object message){ + for(final String shardName : configuration.getAllShardNames()){ + + Future primaryFuture = findPrimaryShardAsync(shardName); + primaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, ActorSelection primaryShard) { + if(failure != null) { + LOG.warn("broadcast failed to send message {} to shard {}: {}", + message.getClass().getSimpleName(), shardName, failure); + } else { + primaryShard.tell(message, ActorRef.noSender()); + } + } + }, getClientDispatcher()); } }