X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=73f1a8f328d2a671c5699d770b782a860afcc7f7;hp=17d988005fadf32c8209837671d43d0aa2981b60;hb=1447e0132075bbd3013aa41b98384a373bd82d1a;hpb=f9a9cd1ea40d2477ccb16b03c71a87595226595a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 17d988005f..73f1a8f328 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -42,11 +42,14 @@ import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.reporting.MetricsReporter; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +99,7 @@ public class ActorContext { private Timeout transactionCommitOperationTimeout; private Timeout shardInitializationTimeout; private final Dispatchers dispatchers; - private Cache> primaryShardActorSelectionCache; + private Cache> primaryShardInfoCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -141,7 +144,7 @@ public class ActorContext { shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); - primaryShardActorSelectionCache = CacheBuilder.newBuilder() + primaryShardInfoCache = CacheBuilder.newBuilder() .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) .build(); } @@ -196,24 +199,24 @@ public class ActorContext { return schemaContext; } - public Future findPrimaryShardAsync(final String shardName) { - Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); + public Future findPrimaryShardAsync(final String shardName) { + Future ret = primaryShardInfoCache.getIfPresent(shardName); if(ret != null){ return ret; } Future future = executeOperationAsync(shardManager, new FindPrimary(shardName, true), shardInitializationTimeout); - return future.transform(new Mapper() { + return future.transform(new Mapper() { @Override - public ActorSelection checkedApply(Object response) throws Exception { - if(response instanceof PrimaryFound) { - PrimaryFound found = (PrimaryFound)response; - - LOG.debug("Primary found {}", found.getPrimaryPath()); - ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); - primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); - return actorSelection; + public PrimaryShardInfo checkedApply(Object response) throws Exception { + if(response instanceof RemotePrimaryShardFound) { + LOG.debug("findPrimaryShardAsync received: {}", response); + return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null); + } else if(response instanceof LocalPrimaryShardFound) { + LOG.debug("findPrimaryShardAsync received: {}", response); + LocalPrimaryShardFound found = (LocalPrimaryShardFound)response; + return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree()); } else if(response instanceof NotInitializedException) { throw (NotInitializedException)response; } else if(response instanceof PrimaryNotFoundException) { @@ -228,6 +231,14 @@ public class ActorContext { }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } + private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath, + DataTree localShardDataTree) { + ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath); + PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree)); + primaryShardInfoCache.put(shardName, Futures.successful(info)); + return info; + } + /** * Finds a local shard given its shard name and return it's ActorRef * @@ -387,15 +398,15 @@ public class ActorContext { public void broadcast(final Object message){ for(final String shardName : configuration.getAllShardNames()){ - Future primaryFuture = findPrimaryShardAsync(shardName); - primaryFuture.onComplete(new OnComplete() { + Future primaryFuture = findPrimaryShardAsync(shardName); + primaryFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { + public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { if(failure != null) { LOG.warn("broadcast failed to send message {} to shard {}: {}", message.getClass().getSimpleName(), shardName, failure); } else { - primaryShard.tell(message, ActorRef.noSender()); + primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender()); } } }, getClientDispatcher()); @@ -553,7 +564,7 @@ public class ActorContext { } @VisibleForTesting - Cache> getPrimaryShardActorSelectionCache() { - return primaryShardActorSelectionCache; + Cache> getPrimaryShardInfoCache() { + return primaryShardInfoCache; } }