X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=ee549d36118e8f026f1ed6e83c80b842fe5bbf50;hp=de115f06638878a343499a9f72d071324e1827e9;hb=39b7a263d64559bc4f593726f56aa38ab9cc0b1c;hpb=a510fba141230ce9fe8301f9eb0198cc09df46ca diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index de115f0663..ee549d3611 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,34 +7,22 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.actor.ActorRef; -import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableBiMap.Builder; -import com.google.common.primitives.UnsignedLong; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.access.ABIVersion; -import org.opendaylight.controller.cluster.access.client.BackendInfo; +import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; -import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; -import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; -import org.opendaylight.controller.cluster.access.concepts.RequestFailure; -import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.compat.java8.FutureConverters; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named @@ -43,22 +31,12 @@ import scala.compat.java8.FutureConverters; * * @author Robert Varga */ -final class ModuleShardBackendResolver extends BackendInfoResolver { - private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); +@ThreadSafe +final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); - /** - * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure. - * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain - * non-operational. - */ - // TODO: maybe make this configurable somehow? - private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES); - + private final ConcurrentMap backends = new ConcurrentHashMap<>(); private final ActorContext actorContext; - // FIXME: this counter should be in superclass somewhere - private final AtomicLong nextSessionId = new AtomicLong(); - private final Function1 connectFunction; @GuardedBy("this") private long nextShard = 1; @@ -67,21 +45,8 @@ final class ModuleShardBackendResolver extends BackendInfoResolver new ConnectClientRequest(clientId, t, ABIVersion.BORON, - ABIVersion.current())); - } - - @Override - protected void invalidateBackendInfo(final CompletionStage info) { - LOG.trace("Initiated invalidation of backend information {}", info); - info.thenAccept(this::invalidate); - } - - private void invalidate(final BackendInfo result) { - Preconditions.checkArgument(result instanceof ShardBackendInfo); - LOG.debug("Invalidating backend information {}", result); - actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName()); } Long resolveShardForPath(final YangInstanceIdentifier path) { @@ -104,43 +69,70 @@ final class ModuleShardBackendResolver extends BackendInfoResolver resolveBackendInfo(final Long cookie) { + public CompletionStage getBackendInfo(final Long cookie) { + /* + * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state + * is inserted into the map and retired from it (based on the stage result). + * + * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute + * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of + * non-existent element. + */ + final ShardState existing = backends.get(cookie); + if (existing != null) { + return existing.getStage(); + } + final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); - return NULL_FUTURE; + throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned"); } - final CompletableFuture ret = new CompletableFuture<>(); - - FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> { - LOG.debug("Looking up primary info for {} from {}", shardName, info); - return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); - }).thenApply(response -> { - if (response instanceof RequestFailure) { - final RequestFailure failure = (RequestFailure) response; - LOG.debug("Connect request failed {}", failure, failure.getCause()); - throw Throwables.propagate(failure.getCause()); - } + LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); + final ShardState toInsert = resolveBackendInfo(shardName, cookie); - LOG.debug("Resolved backend information to {}", response); + final ShardState raced = backends.putIfAbsent(cookie, toInsert); + if (raced != null) { + // We have had a concurrent insertion, return that + LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName); + return raced.getStage(); + } - Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response); - final ConnectClientSuccess success = (ConnectClientSuccess) response; + // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to + // complete + final CompletionStage stage = toInsert.getStage(); + stage.whenComplete((info, failure) -> { + if (failure != null) { + LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure); + backends.remove(cookie, toInsert); - return new ShardBackendInfo(success.getBackend(), - nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), - success.getDataTree(), success.getMaxMessages()); - }).whenComplete((info, throwablw) -> { - if (throwablw != null) { - ret.completeExceptionally(throwablw); - } else { - ret.complete(info); + // Remove cache state in case someone else forgot to invalidate it + flushCache(shardName); } }); - LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - return ret; + return stage; + } + + @Override + public CompletionStage refreshBackendInfo(final Long cookie, + final ShardBackendInfo staleInfo) { + final ShardState existing = backends.get(cookie); + if (existing != null) { + if (!staleInfo.equals(existing.getResult())) { + return existing.getStage(); + } + + LOG.debug("Invalidating backend information {}", staleInfo); + flushCache(staleInfo.getShardName()); + + LOG.trace("Invalidated cache %s", staleInfo); + backends.remove(cookie, existing); + } + + return getBackendInfo(cookie); } }