X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=93121445f2b0f39232e2cb85c47c89077aacf9ce;hb=70003a00e3a5002cd58594b8a71dead5f00381e3;hp=ee549d36118e8f026f1ed6e83c80b842fe5bbf50;hpb=7a6a43e81663aee0a30952a3d5381d7b97098796;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index ee549d3611..93121445f2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,22 +7,30 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Preconditions; +import static akka.pattern.Patterns.ask; + +import akka.dispatch.ExecutionContexts; +import akka.dispatch.OnComplete; +import akka.util.Timeout; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableBiMap.Builder; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named @@ -36,7 +44,8 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); private final ConcurrentMap backends = new ConcurrentHashMap<>(); - private final ActorContext actorContext; + + private final Future shardAvailabilityChangesRegFuture; @GuardedBy("this") private long nextShard = 1; @@ -46,11 +55,35 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { // FIXME: we really need just ActorContext.findPrimaryShardAsync() ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { super(clientId, actorContext); - this.actorContext = Preconditions.checkNotNull(actorContext); + + shardAvailabilityChangesRegFuture = ask(actorContext.getShardManager(), new RegisterForShardAvailabilityChanges( + this::onShardAvailabilityChange), Timeout.apply(60, TimeUnit.MINUTES)) + .map(reply -> (Registration)reply, ExecutionContexts.global()); + + shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Registration reply) { + if (failure != null) { + LOG.error("RegisterForShardAvailabilityChanges failed", failure); + } + } + }, ExecutionContexts.global()); + } + + private void onShardAvailabilityChange(String shardName) { + LOG.debug("onShardAvailabilityChange for {}", shardName); + + Long cookie = shards.get(shardName); + if (cookie == null) { + LOG.debug("No shard cookie found for {}", shardName); + return; + } + + notifyStaleBackendInfoCallbacks(cookie); } Long resolveShardForPath(final YangInstanceIdentifier path) { - final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); + final String shardName = actorContext().getShardStrategyFactory().getStrategy(path).findShard(path); Long cookie = shards.get(shardName); if (cookie == null) { synchronized (this) { @@ -69,7 +102,6 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { return cookie; } - @Override public CompletionStage getBackendInfo(final Long cookie) { /* @@ -135,4 +167,14 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { return getBackendInfo(cookie); } + + @Override + public void close() { + shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Registration reply) { + reply.close(); + } + }, ExecutionContexts.global()); + } }