X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=ee887b00faca112951952d18715259871904cb84;hb=refs%2Fchanges%2F78%2F100478%2F3;hp=ee549d36118e8f026f1ed6e83c80b842fe5bbf50;hpb=8eee3764486ca996d7e33da7aa76f3d2f38129a2;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index ee549d3611..ee887b00fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,68 +7,106 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; +import static akka.pattern.Patterns.ask; +import static com.google.common.base.Verify.verifyNotNull; + +import akka.dispatch.ExecutionContexts; +import akka.dispatch.OnComplete; +import akka.util.Timeout; import com.google.common.collect.ImmutableBiMap; -import com.google.common.collect.ImmutableBiMap.Builder; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding - * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}. + * shard leader is resolved via {@link ActorUtils}. The product of resolution is {@link ShardBackendInfo}. + * + *

+ * This class is thread-safe. * * @author Robert Varga */ -@ThreadSafe final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); private final ConcurrentMap backends = new ConcurrentHashMap<>(); - private final ActorContext actorContext; + + private final Future shardAvailabilityChangesRegFuture; @GuardedBy("this") private long nextShard = 1; - private volatile BiMap shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); + private volatile ImmutableBiMap shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); // FIXME: we really need just ActorContext.findPrimaryShardAsync() - ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { - super(clientId, actorContext); - this.actorContext = Preconditions.checkNotNull(actorContext); + ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorUtils actorUtils) { + super(clientId, actorUtils); + + shardAvailabilityChangesRegFuture = ask(actorUtils.getShardManager(), new RegisterForShardAvailabilityChanges( + this::onShardAvailabilityChange), Timeout.apply(60, TimeUnit.MINUTES)) + .map(reply -> (Registration)reply, ExecutionContexts.global()); + + shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Registration reply) { + if (failure != null) { + LOG.error("RegisterForShardAvailabilityChanges failed", failure); + } + } + }, ExecutionContexts.global()); } - Long resolveShardForPath(final YangInstanceIdentifier path) { - final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); + private void onShardAvailabilityChange(final String shardName) { + LOG.debug("onShardAvailabilityChange for {}", shardName); + Long cookie = shards.get(shardName); if (cookie == null) { - synchronized (this) { - cookie = shards.get(shardName); - if (cookie == null) { - cookie = nextShard++; - - Builder builder = ImmutableBiMap.builder(); - builder.putAll(shards); - builder.put(shardName, cookie); - shards = builder.build(); - } - } + LOG.debug("No shard cookie found for {}", shardName); + return; } - return cookie; + notifyStaleBackendInfoCallbacks(cookie); } + Long resolveShardForPath(final YangInstanceIdentifier path) { + return resolveCookie(actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path)); + } + + Stream resolveAllShards() { + return actorUtils().getConfiguration().getAllShardNames().stream() + .sorted() + .map(this::resolveCookie); + } + + private @NonNull Long resolveCookie(final String shardName) { + final Long cookie = shards.get(shardName); + return cookie != null ? cookie : populateShard(shardName); + } + + private synchronized @NonNull Long populateShard(final String shardName) { + Long cookie = shards.get(shardName); + if (cookie == null) { + cookie = nextShard++; + shards = ImmutableBiMap.builder().putAll(shards).put(shardName, cookie).build(); + } + return cookie; + } @Override public CompletionStage getBackendInfo(final Long cookie) { @@ -127,12 +165,27 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { } LOG.debug("Invalidating backend information {}", staleInfo); - flushCache(staleInfo.getShardName()); + flushCache(staleInfo.getName()); - LOG.trace("Invalidated cache %s", staleInfo); + LOG.trace("Invalidated cache {}", staleInfo); backends.remove(cookie, existing); } return getBackendInfo(cookie); } + + @Override + public void close() { + shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Registration reply) { + reply.close(); + } + }, ExecutionContexts.global()); + } + + @Override + public String resolveCookieName(final Long cookie) { + return verifyNotNull(shards.inverse().get(cookie), "Unexpected null cookie: %s", cookie); + } }