X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=61bb78ed3fc464d7abe502be7c8f8b833df0d773;hp=bef863ddf48a50fc81165f1d04de8d7aba691d3f;hb=c763a16f9cd72bb4f27997f301f163b83d470a24;hpb=98d1c5606bad9633ce5549bcd691a98c75abdf6a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index bef863ddf4..61bb78ed3f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,137 +7,175 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static akka.pattern.Patterns.ask; +import static com.google.common.base.Verify.verifyNotNull; + import akka.dispatch.ExecutionContexts; import akka.dispatch.OnComplete; import akka.util.Timeout; -import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; -import com.google.common.collect.ImmutableBiMap.Builder; -import com.google.common.primitives.UnsignedLong; -import com.google.common.util.concurrent.MoreExecutors; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.access.ABIVersion; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo; -import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfoResolver; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding - * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}. + * shard leader is resolved via {@link ActorUtils}. The product of resolution is {@link ShardBackendInfo}. + * + *

+ * This class is thread-safe. * * @author Robert Varga */ -final class ModuleShardBackendResolver extends BackendInfoResolver { - private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = - ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()); - private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); +final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); - /** - * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure. - * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain - * non-operational. - */ - // TODO: maybe make this configurable somehow? - private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES); + private final ConcurrentMap backends = new ConcurrentHashMap<>(); - private final ActorContext actorContext; + private final Future shardAvailabilityChangesRegFuture; @GuardedBy("this") private long nextShard = 1; - private volatile BiMap shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); + private volatile ImmutableBiMap shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); // FIXME: we really need just ActorContext.findPrimaryShardAsync() - ModuleShardBackendResolver(final ActorContext actorContext) { - this.actorContext = Preconditions.checkNotNull(actorContext); - } + ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorUtils actorUtils) { + super(clientId, actorUtils); - @Override - protected void invalidateBackendInfo(final CompletionStage info) { - LOG.trace("Initiated invalidation of backend information {}", info); - info.thenAccept(this::invalidate); + shardAvailabilityChangesRegFuture = ask(actorUtils.getShardManager(), new RegisterForShardAvailabilityChanges( + this::onShardAvailabilityChange), Timeout.apply(60, TimeUnit.MINUTES)) + .map(reply -> (Registration)reply, ExecutionContexts.global()); + + shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Registration reply) { + if (failure != null) { + LOG.error("RegisterForShardAvailabilityChanges failed", failure); + } + } + }, ExecutionContexts.global()); } - private void invalidate(final BackendInfo result) { - Preconditions.checkArgument(result instanceof ShardBackendInfo); - LOG.debug("Invalidating backend information {}", result); - actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName()); + private void onShardAvailabilityChange(final String shardName) { + LOG.debug("onShardAvailabilityChange for {}", shardName); + + Long cookie = shards.get(shardName); + if (cookie == null) { + LOG.debug("No shard cookie found for {}", shardName); + return; + } + + notifyStaleBackendInfoCallbacks(cookie); } Long resolveShardForPath(final YangInstanceIdentifier path) { - final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); + final String shardName = actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path); + final Long cookie = shards.get(shardName); + return cookie != null ? cookie : populateShard(shardName); + } + + private synchronized @NonNull Long populateShard(final String shardName) { Long cookie = shards.get(shardName); if (cookie == null) { - synchronized (this) { - cookie = shards.get(shardName); - if (cookie == null) { - cookie = nextShard++; - - Builder b = ImmutableBiMap.builder(); - b.putAll(shards); - b.put(shardName, cookie); - shards = b.build(); - } - } + cookie = nextShard++; + shards = ImmutableBiMap.builder().putAll(shards).put(shardName, cookie).build(); } - return cookie; } @Override - protected CompletableFuture resolveBackendInfo(final Long cookie) { + public CompletionStage getBackendInfo(final Long cookie) { + /* + * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state + * is inserted into the map and retired from it (based on the stage result). + * + * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute + * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of + * non-existent element. + */ + final ShardState existing = backends.get(cookie); + if (existing != null) { + return existing.getStage(); + } + final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); - return NULL_FUTURE; + throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned"); } - final CompletableFuture ret = new CompletableFuture<>(); + LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); + final ShardState toInsert = resolveBackendInfo(shardName, cookie); - actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable t, final PrimaryShardInfo v) { - if (t != null) { - ret.completeExceptionally(t); - } else { - ret.complete(createBackendInfo(v, shardName, cookie)); - } + final ShardState raced = backends.putIfAbsent(cookie, toInsert); + if (raced != null) { + // We have had a concurrent insertion, return that + LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName); + return raced.getStage(); + } + + // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to + // complete + final CompletionStage stage = toInsert.getStage(); + stage.whenComplete((info, failure) -> { + if (failure != null) { + LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure); + backends.remove(cookie, toInsert); + + // Remove cache state in case someone else forgot to invalidate it + flushCache(shardName); } - }, DIRECT_EXECUTION_CONTEXT); + }); - LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - return ret; + return stage; } - private static ABIVersion toABIVersion(final short version) { - switch (version) { - case DataStoreVersions.BORON_VERSION: - return ABIVersion.BORON; + @Override + public CompletionStage refreshBackendInfo(final Long cookie, + final ShardBackendInfo staleInfo) { + final ShardState existing = backends.get(cookie); + if (existing != null) { + if (!staleInfo.equals(existing.getResult())) { + return existing.getStage(); + } + + LOG.debug("Invalidating backend information {}", staleInfo); + flushCache(staleInfo.getName()); + + LOG.trace("Invalidated cache {}", staleInfo); + backends.remove(cookie, existing); } - throw new IllegalArgumentException("Unsupported version " + version); + return getBackendInfo(cookie); } - private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) { - Preconditions.checkArgument(result instanceof PrimaryShardInfo); - final PrimaryShardInfo info = (PrimaryShardInfo) result; + @Override + public void close() { + shardAvailabilityChangesRegFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Registration reply) { + reply.close(); + } + }, ExecutionContexts.global()); + } - LOG.debug("Creating backend information for {}", info); - return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(), - toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie), - info.getLocalShardDataTree()); - } + @Override + public String resolveCookieName(final Long cookie) { + return verifyNotNull(shards.inverse().get(cookie), "Unexpected null cookie: %s", cookie); + } }