From: Robert Varga Date: Thu, 30 Mar 2017 13:14:04 +0000 (+0200) Subject: BUG-5280: Correct reconnect retry logic X-Git-Tag: release/nitrogen~341 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=9409f87fa5f6ea0a37384a85bb4e66b974fdd9a7 BUG-5280: Correct reconnect retry logic Our reconnect logic failed to account for various timers during resolution. This patch makes the BackendInfoResolver explicit about the type of failures it can report and fixes AbstractShardBackendResolver to conform to them. Change-Id: I610ddb6e062e223557d46e2950a552de6e7d3843 Signed-off-by: Robert Varga (cherry picked from commit 63bca3841f0187b5127f62fd04e4edcdce3a63c1) --- diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java index e4aa2b1e75..2e8ab4e103 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java @@ -17,6 +17,13 @@ import javax.annotation.Nonnull; * by either the client actor (when a message timeout is detected) and by the specific frontend (on explicit * invalidation or when updated information becomes available). * + *

+ * If the completion stage returned by this interface's methods fails with a + * {@link org.opendaylight.controller.cluster.access.concepts.RequestException}, it will be forwarded to all + * outstanding requests towards the leader. If it fails with a {@link java.util.concurrent.TimeoutException}, + * resolution process will be retries. If it fails with any other cause, it will we wrapped as a + * {@link org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException} wrapping that cause. + * * @author Robert Varga */ public abstract class BackendInfoResolver { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 45580e92fd..896d85b713 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -12,6 +12,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Verify; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -29,6 +31,7 @@ import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.WritableIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * A behavior, which handles messages sent to a {@link AbstractClientActor}. @@ -53,6 +56,7 @@ public abstract class ClientActorBehavior extends } private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); + private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS); /** * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations @@ -213,8 +217,33 @@ public abstract class ClientActorBehavior extends private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, final T backend, final Throwable failure) { if (failure != null) { + if (failure instanceof TimeoutException) { + if (!conn.equals(connections.get(shard))) { + // AbstractClientConnection will remove itself when it decides there is no point in continuing, + // at which point we want to stop retrying + LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn, + failure); + return; + } + + LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard, + RESOLVE_RETRY_DURATION, failure); + context().executeInActor(b -> { + resolveConnection(shard, conn); + return b; + }, RESOLVE_RETRY_DURATION); + return; + } + LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure); - conn.poison(new RuntimeRequestException("Failed to resolve shard " + shard, failure)); + final RequestException cause; + if (failure instanceof RequestException) { + cause = (RequestException) failure; + } else { + cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure); + } + + conn.poison(cause); return; } @@ -268,12 +297,15 @@ public abstract class ClientActorBehavior extends private ConnectingClientConnection createConnection(final Long shard) { final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard); + resolveConnection(shard, conn); + return conn; + } + private void resolveConnection(final Long shard, final AbstractClientConnection conn) { + LOG.debug("{}: resolving shard {} connection {}", persistenceId(), shard, conn); resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> { backendConnectFinished(shard, conn, backend, failure); return behavior; })); - - return conn; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java index a81cf337ac..51f96e18fd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java @@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.primitives.UnsignedLong; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -24,8 +24,13 @@ import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,12 +76,10 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver connectFunction; @@ -96,24 +99,57 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver { - LOG.debug("Looking up primary info for {} from {}", shardName, i); - return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); - }).thenApply(response -> { - if (response instanceof RequestFailure) { - final RequestFailure failure = (RequestFailure) response; - LOG.debug("Connect request failed {}", failure, failure.getCause()); - throw Throwables.propagate(failure.getCause()); + final CompletableFuture future = new CompletableFuture<>(); + FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> { + if (failure == null) { + connectShard(shardName, cookie, info, future); + return; } - LOG.debug("Resolved backend information to {}", response); + LOG.debug("Shard {} failed to resolve", shardName, failure); + if (failure instanceof NoShardLeaderException) { + // FIXME: this actually is an exception we can retry on + future.completeExceptionally(failure); + } else if (failure instanceof NotInitializedException) { + // FIXME: this actually is an exception we can retry on + LOG.info("Shard {} has not initialized yet", shardName); + future.completeExceptionally(failure); + } else if (failure instanceof PrimaryNotFoundException) { + LOG.info("Failed to find primary for shard {}", shardName); + future.completeExceptionally(failure); + } else { + future.completeExceptionally(failure); + } + }); - Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response); - final ConnectClientSuccess success = (ConnectClientSuccess) response; + return new ShardState(future); + } - return new ShardBackendInfo(success.getBackend(), - nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), - success.getDataTree(), success.getMaxMessages()); - })); + private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info, + final CompletableFuture future) { + LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info); + + FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT)) + .whenComplete((response, failure) -> { + if (failure != null) { + LOG.debug("Connect attempt to {} failed", shardName, failure); + future.completeExceptionally(failure); + return; + } + if (response instanceof RequestFailure) { + final RequestException cause = ((RequestFailure) response).getCause(); + LOG.debug("Connect attempt to {} failed to process", shardName, cause); + future.completeExceptionally(cause); + return; + } + + LOG.debug("Resolved backend information to {}", response); + Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", + response); + final ConnectClientSuccess success = (ConnectClientSuccess) response; + future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(), + success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(), + success.getMaxMessages())); + }); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java index 05145899b4..d1c322f86a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java @@ -110,9 +110,8 @@ public class ModuleShardBackendResolverTest { public void testGetBackendInfoFail() throws Exception { final CompletionStage i = moduleShardBackendResolver.getBackendInfo(0L); final ConnectClientRequest req = contextProbe.expectMsgClass(ConnectClientRequest.class); - final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException()); - final ConnectClientFailure response = - req.toRequestFailure(cause); + final RuntimeException cause = new RuntimeException(); + final ConnectClientFailure response = req.toRequestFailure(new RuntimeRequestException("fail", cause)); contextProbe.reply(response); final CompletionStage stage = moduleShardBackendResolver.getBackendInfo(0L); final ExecutionException caught =