X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractShardBackendResolver.java;h=46e035d0ec63ec025d0b78d1260cf519ffc51a15;hp=a81cf337acc72f7b383700093be742e6541a7f73;hb=b712eb01354ddb5878008e2a2e8f03fb19b92555;hpb=32b322afd58f120a78208c939a01422aa224d0cf diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java index a81cf337ac..46e035d0ec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java @@ -10,10 +10,11 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.primitives.UnsignedLong; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -23,9 +24,14 @@ import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,12 +77,10 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver connectFunction; @@ -96,24 +100,64 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver { - LOG.debug("Looking up primary info for {} from {}", shardName, i); - return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); - }).thenApply(response -> { - if (response instanceof RequestFailure) { - final RequestFailure failure = (RequestFailure) response; - LOG.debug("Connect request failed {}", failure, failure.getCause()); - throw Throwables.propagate(failure.getCause()); + final CompletableFuture future = new CompletableFuture<>(); + FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> { + if (failure == null) { + connectShard(shardName, cookie, info, future); + return; + } + + LOG.debug("Shard {} failed to resolve", shardName, failure); + if (failure instanceof NoShardLeaderException) { + future.completeExceptionally(wrap("Shard has no current leader", failure)); + } else if (failure instanceof NotInitializedException) { + // FIXME: this actually is an exception we can retry on + LOG.info("Shard {} has not initialized yet", shardName); + future.completeExceptionally(failure); + } else if (failure instanceof PrimaryNotFoundException) { + LOG.info("Failed to find primary for shard {}", shardName); + future.completeExceptionally(failure); + } else { + future.completeExceptionally(failure); } + }); - LOG.debug("Resolved backend information to {}", response); + return new ShardState(future); + } - Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response); - final ConnectClientSuccess success = (ConnectClientSuccess) response; + private static TimeoutException wrap(final String message, final Throwable cause) { + final TimeoutException ret = new TimeoutException(message); + ret.initCause(Preconditions.checkNotNull(cause)); + return ret; + } - return new ShardBackendInfo(success.getBackend(), - nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), - success.getDataTree(), success.getMaxMessages()); - })); + private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info, + final CompletableFuture future) { + LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info); + + FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT)) + .whenComplete((response, failure) -> { + if (failure != null) { + LOG.debug("Connect attempt to {} failed, will retry", shardName, failure); + future.completeExceptionally(wrap("Connection attempt failed", failure)); + return; + } + if (response instanceof RequestFailure) { + final Throwable cause = ((RequestFailure) response).getCause().unwrap(); + LOG.debug("Connect attempt to {} failed to process", shardName, cause); + final Throwable result = cause instanceof NotLeaderException + ? wrap("Leader moved during establishment", cause) : cause; + future.completeExceptionally(result); + return; + } + + LOG.debug("Resolved backend information to {}", response); + Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s", + response); + final ConnectClientSuccess success = (ConnectClientSuccess) response; + future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(), + success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(), + success.getMaxMessages())); + }); } }