X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=ee887b00faca112951952d18715259871904cb84;hp=f6452a19b43cbbf7b3b6b365f3e1da1b7f8d5362;hb=refs%2Fchanges%2F78%2F100478%2F3;hpb=2b702880c19e11be077ddcc540aeacd80ecfcaf6 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index f6452a19b4..ee887b00fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -13,20 +13,19 @@ import static com.google.common.base.Verify.verifyNotNull; import akka.dispatch.ExecutionContexts; import akka.dispatch.OnComplete; import akka.util.Timeout; -import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; -import com.google.common.collect.ImmutableBiMap.Builder; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; +import java.util.stream.Stream; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; @@ -36,11 +35,13 @@ import scala.concurrent.Future; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding - * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}. + * shard leader is resolved via {@link ActorUtils}. The product of resolution is {@link ShardBackendInfo}. + * + *

+ * This class is thread-safe. * * @author Robert Varga */ -@ThreadSafe final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); @@ -51,19 +52,19 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { @GuardedBy("this") private long nextShard = 1; - private volatile BiMap shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); + private volatile ImmutableBiMap shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); // FIXME: we really need just ActorContext.findPrimaryShardAsync() - ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { - super(clientId, actorContext); + ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorUtils actorUtils) { + super(clientId, actorUtils); - shardAvailabilityChangesRegFuture = ask(actorContext.getShardManager(), new RegisterForShardAvailabilityChanges( + shardAvailabilityChangesRegFuture = ask(actorUtils.getShardManager(), new RegisterForShardAvailabilityChanges( this::onShardAvailabilityChange), Timeout.apply(60, TimeUnit.MINUTES)) .map(reply -> (Registration)reply, ExecutionContexts.global()); shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Registration reply) { + public void onComplete(final Throwable failure, final Registration reply) { if (failure != null) { LOG.error("RegisterForShardAvailabilityChanges failed", failure); } @@ -71,7 +72,7 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { }, ExecutionContexts.global()); } - private void onShardAvailabilityChange(String shardName) { + private void onShardAvailabilityChange(final String shardName) { LOG.debug("onShardAvailabilityChange for {}", shardName); Long cookie = shards.get(shardName); @@ -84,22 +85,26 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { } Long resolveShardForPath(final YangInstanceIdentifier path) { - final String shardName = actorContext().getShardStrategyFactory().getStrategy(path).findShard(path); + return resolveCookie(actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path)); + } + + Stream resolveAllShards() { + return actorUtils().getConfiguration().getAllShardNames().stream() + .sorted() + .map(this::resolveCookie); + } + + private @NonNull Long resolveCookie(final String shardName) { + final Long cookie = shards.get(shardName); + return cookie != null ? cookie : populateShard(shardName); + } + + private synchronized @NonNull Long populateShard(final String shardName) { Long cookie = shards.get(shardName); if (cookie == null) { - synchronized (this) { - cookie = shards.get(shardName); - if (cookie == null) { - cookie = nextShard++; - - Builder builder = ImmutableBiMap.builder(); - builder.putAll(shards); - builder.put(shardName, cookie); - shards = builder.build(); - } - } + cookie = nextShard++; + shards = ImmutableBiMap.builder().putAll(shards).put(shardName, cookie).build(); } - return cookie; } @@ -173,14 +178,14 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { public void close() { shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Registration reply) { + public void onComplete(final Throwable failure, final Registration reply) { reply.close(); } }, ExecutionContexts.global()); } @Override - public String resolveCookieName(Long cookie) { + public String resolveCookieName(final Long cookie) { return verifyNotNull(shards.inverse().get(cookie), "Unexpected null cookie: %s", cookie); } }