X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=f6452a19b43cbbf7b3b6b365f3e1da1b7f8d5362;hb=refs%2Fchanges%2F10%2F78310%2F5;hp=ee549d36118e8f026f1ed6e83c80b842fe5bbf50;hpb=8eee3764486ca996d7e33da7aa76f3d2f38129a2;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index ee549d3611..f6452a19b4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,22 +7,31 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Preconditions; +import static akka.pattern.Patterns.ask; +import static com.google.common.base.Verify.verifyNotNull; + +import akka.dispatch.ExecutionContexts; +import akka.dispatch.OnComplete; +import akka.util.Timeout; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableBiMap.Builder; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named @@ -36,7 +45,8 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); private final ConcurrentMap backends = new ConcurrentHashMap<>(); - private final ActorContext actorContext; + + private final Future shardAvailabilityChangesRegFuture; @GuardedBy("this") private long nextShard = 1; @@ -46,11 +56,35 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { // FIXME: we really need just ActorContext.findPrimaryShardAsync() ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { super(clientId, actorContext); - this.actorContext = Preconditions.checkNotNull(actorContext); + + shardAvailabilityChangesRegFuture = ask(actorContext.getShardManager(), new RegisterForShardAvailabilityChanges( + this::onShardAvailabilityChange), Timeout.apply(60, TimeUnit.MINUTES)) + .map(reply -> (Registration)reply, ExecutionContexts.global()); + + shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Registration reply) { + if (failure != null) { + LOG.error("RegisterForShardAvailabilityChanges failed", failure); + } + } + }, ExecutionContexts.global()); + } + + private void onShardAvailabilityChange(String shardName) { + LOG.debug("onShardAvailabilityChange for {}", shardName); + + Long cookie = shards.get(shardName); + if (cookie == null) { + LOG.debug("No shard cookie found for {}", shardName); + return; + } + + notifyStaleBackendInfoCallbacks(cookie); } Long resolveShardForPath(final YangInstanceIdentifier path) { - final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); + final String shardName = actorContext().getShardStrategyFactory().getStrategy(path).findShard(path); Long cookie = shards.get(shardName); if (cookie == null) { synchronized (this) { @@ -69,7 +103,6 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { return cookie; } - @Override public CompletionStage getBackendInfo(final Long cookie) { /* @@ -127,12 +160,27 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { } LOG.debug("Invalidating backend information {}", staleInfo); - flushCache(staleInfo.getShardName()); + flushCache(staleInfo.getName()); - LOG.trace("Invalidated cache %s", staleInfo); + LOG.trace("Invalidated cache {}", staleInfo); backends.remove(cookie, existing); } return getBackendInfo(cookie); } + + @Override + public void close() { + shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Registration reply) { + reply.close(); + } + }, ExecutionContexts.global()); + } + + @Override + public String resolveCookieName(Long cookie) { + return verifyNotNull(shards.inverse().get(cookie), "Unexpected null cookie: %s", cookie); + } }