X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=dde2292241ef55344e64bbd2793a03038c13bfa3;hp=9e6485b296e0e87dc20febe7db12ab08991d4db3;hb=32b322afd58f120a78208c939a01422aa224d0cf;hpb=c7846405c83f680660852f299d8051b420b3cddd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 9e6485b296..dde2292241 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,39 +7,22 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.actor.ActorRef; -import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableBiMap.Builder; -import com.google.common.primitives.UnsignedLong; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; -import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; -import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; -import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; -import org.opendaylight.controller.cluster.access.concepts.RequestFailure; -import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.compat.java8.FutureConverters; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named @@ -48,51 +31,11 @@ import scala.compat.java8.FutureConverters; * * @author Robert Varga */ -@SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION", - justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended") @ThreadSafe -final class ModuleShardBackendResolver extends BackendInfoResolver { - private static final class Entry { - private final CompletionStage stage; - @GuardedBy("this") - private ShardBackendInfo result; - - Entry(final CompletionStage stage) { - this.stage = Preconditions.checkNotNull(stage); - stage.whenComplete(this::onStageResolved); - } - - @Nonnull CompletionStage getStage() { - return stage; - } - - synchronized @Nullable ShardBackendInfo getResult() { - return result; - } - - private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) { - if (failure == null) { - this.result = Preconditions.checkNotNull(result); - } else { - LOG.warn("Failed to resolve shard", failure); - } - } - } - - private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); +final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); - /** - * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure. - * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain - * non-operational. - */ - // TODO: maybe make this configurable somehow? - private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES); - - private final ConcurrentMap backends = new ConcurrentHashMap<>(); - private final AtomicLong nextSessionId = new AtomicLong(); - private final Function1 connectFunction; + private final ConcurrentMap backends = new ConcurrentHashMap<>(); private final ActorContext actorContext; @GuardedBy("this") @@ -102,9 +45,8 @@ final class ModuleShardBackendResolver extends BackendInfoResolver new ConnectClientRequest(clientId, t, ABIVersion.BORON, - ABIVersion.current())); } Long resolveShardForPath(final YangInstanceIdentifier path) { @@ -127,54 +69,36 @@ final class ModuleShardBackendResolver extends BackendInfoResolver resolveBackendInfo(final Long cookie) { + private ShardState resolveBackendInfo(final Long cookie) { final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); - return NULL_FUTURE; + return null; } LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> { - LOG.debug("Looking up primary info for {} from {}", shardName, info); - return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); - }).thenApply(response -> { - if (response instanceof RequestFailure) { - final RequestFailure failure = (RequestFailure) response; - LOG.debug("Connect request failed {}", failure, failure.getCause()); - throw Throwables.propagate(failure.getCause()); - } - - LOG.debug("Resolved backend information to {}", response); - - Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response); - final ConnectClientSuccess success = (ConnectClientSuccess) response; - - return new ShardBackendInfo(success.getBackend(), - nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), - success.getDataTree(), success.getMaxMessages()); - }); + return resolveBackendInfo(shardName, cookie); } @Override - public CompletionStage getBackendInfo(final Long cookie) { - return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage(); + public CompletionStage getBackendInfo(final Long cookie) { + return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage(); } @Override - public CompletionStage refreshBackendInfo(final Long cookie, + public CompletionStage refreshBackendInfo(final Long cookie, final ShardBackendInfo staleInfo) { - final Entry existing = backends.get(cookie); + final ShardState existing = backends.get(cookie); if (existing != null) { if (!staleInfo.equals(existing.getResult())) { return existing.getStage(); } LOG.debug("Invalidating backend information {}", staleInfo); - actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName()); + flushCache(staleInfo.getShardName()); - LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo); + LOG.trace("Invalidated cache %s", staleInfo); backends.remove(cookie, existing); }