X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=dde2292241ef55344e64bbd2793a03038c13bfa3;hb=cc5009b8f3ea91f64ee48cda815c6a5e73a8a1af;hp=0633b68f1f992bff6e8095ca69f2790e54e3a0c4;hpb=b0067e0a4bfa955f15c6259e019f954687264eff;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 0633b68f1f..dde2292241 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,30 +7,22 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.dispatch.ExecutionContexts; -import akka.dispatch.OnComplete; -import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableBiMap.Builder; -import com.google.common.primitives.UnsignedLong; -import com.google.common.util.concurrent.MoreExecutors; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.access.ABIVersion; -import org.opendaylight.controller.cluster.access.client.BackendInfo; +import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named @@ -39,20 +31,11 @@ import scala.concurrent.ExecutionContext; * * @author Robert Varga */ -final class ModuleShardBackendResolver extends BackendInfoResolver { - private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = - ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()); - private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); +@ThreadSafe +final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); - /** - * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure. - * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain - * non-operational. - */ - // TODO: maybe make this configurable somehow? - private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES); - + private final ConcurrentMap backends = new ConcurrentHashMap<>(); private final ActorContext actorContext; @GuardedBy("this") @@ -61,22 +44,11 @@ final class ModuleShardBackendResolver extends BackendInfoResolver shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); // FIXME: we really need just ActorContext.findPrimaryShardAsync() - ModuleShardBackendResolver(final ActorContext actorContext) { + ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { + super(clientId, actorContext); this.actorContext = Preconditions.checkNotNull(actorContext); } - @Override - protected void invalidateBackendInfo(final CompletionStage info) { - LOG.trace("Initiated invalidation of backend information {}", info); - info.thenAccept(this::invalidate); - } - - private void invalidate(final BackendInfo result) { - Preconditions.checkArgument(result instanceof ShardBackendInfo); - LOG.debug("Invalidating backend information {}", result); - actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName()); - } - Long resolveShardForPath(final YangInstanceIdentifier path) { final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); Long cookie = shards.get(shardName); @@ -86,10 +58,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver b = ImmutableBiMap.builder(); - b.putAll(shards); - b.put(shardName, cookie); - shards = b.build(); + Builder builder = ImmutableBiMap.builder(); + builder.putAll(shards); + builder.put(shardName, cookie); + shards = builder.build(); } } } @@ -97,47 +69,39 @@ final class ModuleShardBackendResolver extends BackendInfoResolver resolveBackendInfo(final Long cookie) { + private ShardState resolveBackendInfo(final Long cookie) { final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); - return NULL_FUTURE; + return null; } - final CompletableFuture ret = new CompletableFuture<>(); + LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable t, final PrimaryShardInfo v) { - if (t != null) { - ret.completeExceptionally(t); - } else { - ret.complete(createBackendInfo(v, shardName, cookie)); - } - } - }, DIRECT_EXECUTION_CONTEXT); + return resolveBackendInfo(shardName, cookie); + } - LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - return ret; + @Override + public CompletionStage getBackendInfo(final Long cookie) { + return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage(); } - private static ABIVersion toABIVersion(final short version) { - switch (version) { - case DataStoreVersions.BORON_VERSION: - return ABIVersion.BORON; - } + @Override + public CompletionStage refreshBackendInfo(final Long cookie, + final ShardBackendInfo staleInfo) { + final ShardState existing = backends.get(cookie); + if (existing != null) { + if (!staleInfo.equals(existing.getResult())) { + return existing.getStage(); + } - throw new IllegalArgumentException("Unsupported version " + version); - } + LOG.debug("Invalidating backend information {}", staleInfo); + flushCache(staleInfo.getShardName()); - private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) { - Preconditions.checkArgument(result instanceof PrimaryShardInfo); - final PrimaryShardInfo info = (PrimaryShardInfo) result; + LOG.trace("Invalidated cache %s", staleInfo); + backends.remove(cookie, existing); + } - LOG.debug("Creating backend information for {}", info); - return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(), - toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie), - info.getLocalShardDataTree()); - } + return getBackendInfo(cookie); + } }