X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=249cd4524d5d1f2ea1b925008c035e850abb8a5e;hp=6f15c72a46e381123e4c78bd13084e80253e40b6;hb=2ebf9ef718ea7ddd790784a6d241e68ef8d1c564;hpb=c426700e494b8eb18e49c3384d057767a9efed35 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 6f15c72a46..249cd4524d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,22 +7,33 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableBiMap.Builder; import com.google.common.primitives.UnsignedLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.ABIVersion; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo; -import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfoResolver; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.access.client.BackendInfo; +import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; +import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; +import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Function1; import scala.compat.java8.FutureConverters; /** @@ -33,7 +44,9 @@ import scala.compat.java8.FutureConverters; * @author Robert Varga */ final class ModuleShardBackendResolver extends BackendInfoResolver { + private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); + /** * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure. * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain @@ -43,12 +56,20 @@ final class ModuleShardBackendResolver extends BackendInfoResolver connectFunction; + + @GuardedBy("this") + private long nextShard = 1; - private volatile BiMap shards = ImmutableBiMap.of(); + private volatile BiMap shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); // FIXME: we really need just ActorContext.findPrimaryShardAsync() - ModuleShardBackendResolver(final ActorContext actorContext) { + ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { this.actorContext = Preconditions.checkNotNull(actorContext); + this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON, + ABIVersion.current())); } @Override @@ -63,35 +84,63 @@ final class ModuleShardBackendResolver extends BackendInfoResolver b = ImmutableBiMap.builder(); + b.putAll(shards); + b.put(shardName, cookie); + shards = b.build(); + } + } + } + + return cookie; + } + @Override - protected CompletionStage resolveBackendInfo(final Long cookie) { + protected CompletableFuture resolveBackendInfo(final Long cookie) { final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); - return CompletableFuture.completedFuture(null); + return NULL_FUTURE; } - LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)) - .thenApply(o -> createBackendInfo(o, shardName, cookie)); - } + final CompletableFuture ret = new CompletableFuture(); - private static ABIVersion toABIVersion(final short version) { - switch (version) { - case DataStoreVersions.BORON_VERSION: - return ABIVersion.BORON; - } + FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> { + LOG.debug("Looking up primary info for {} from {}", shardName, info); + return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); + }).thenApply(response -> { + if (response instanceof RequestFailure) { + final RequestFailure failure = (RequestFailure) response; + LOG.debug("Connect request failed {}", failure, failure.getCause()); + throw Throwables.propagate(failure.getCause()); + } - throw new IllegalArgumentException("Unsupported version " + version); - } + LOG.debug("Resolved backend information to {}", response); - private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) { - Preconditions.checkArgument(result instanceof PrimaryShardInfo); - final PrimaryShardInfo info = (PrimaryShardInfo) result; + Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response); + final ConnectClientSuccess success = (ConnectClientSuccess) response; - LOG.debug("Creating backend information for {}", info); - return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(), - toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie), - info.getLocalShardDataTree()); - } + return new ShardBackendInfo(success.getBackend(), + nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), + success.getDataTree(), success.getMaxMessages()); + }).whenComplete((info, t) -> { + if (t != null) { + ret.completeExceptionally(t); + } else { + ret.complete(info); + } + }); + + LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); + return ret; + } }