BUG-5280: Correct reconnect retry logic 02/54102/20
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 30 Mar 2017 13:14:04 +0000 (15:14 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 14 Apr 2017 21:08:22 +0000 (21:08 +0000)
Our reconnect logic failed to account for various timers
during resolution. This patch makes the BackendInfoResolver
explicit about the type of failures it can report and fixes
AbstractShardBackendResolver to conform to them.

Change-Id: I610ddb6e062e223557d46e2950a552de6e7d3843
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 63bca3841f0187b5127f62fd04e4edcdce3a63c1)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java

index e4aa2b1e75e267e1f6b4599c1e5c6347153e3c81..2e8ab4e103cc48636c87a8452b74a924ce6c3204 100644 (file)
@@ -17,6 +17,13 @@ import javax.annotation.Nonnull;
  * by either the client actor (when a message timeout is detected) and by the specific frontend (on explicit
  * invalidation or when updated information becomes available).
  *
+ * <p>
+ * If the completion stage returned by this interface's methods fails with a
+ * {@link org.opendaylight.controller.cluster.access.concepts.RequestException}, it will be forwarded to all
+ * outstanding requests towards the leader. If it fails with a {@link java.util.concurrent.TimeoutException},
+ * resolution process will be retries. If it fails with any other cause, it will we wrapped as a
+ * {@link org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException} wrapping that cause.
+ *
  * @author Robert Varga
  */
 public abstract class BackendInfoResolver<T extends BackendInfo> {
index 45580e92fd734489616c4840a5feac2f529aafd8..896d85b713fcecfb611cc0d23987a973125718d1 100644 (file)
@@ -12,6 +12,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -29,6 +31,7 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A behavior, which handles messages sent to a {@link AbstractClientActor}.
@@ -53,6 +56,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
+    private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS);
 
     /**
      * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations
@@ -213,8 +217,33 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
             final T backend, final Throwable failure) {
         if (failure != null) {
+            if (failure instanceof TimeoutException) {
+                if (!conn.equals(connections.get(shard))) {
+                    // AbstractClientConnection will remove itself when it decides there is no point in continuing,
+                    // at which point we want to stop retrying
+                    LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn,
+                        failure);
+                    return;
+                }
+
+                LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard,
+                    RESOLVE_RETRY_DURATION, failure);
+                context().executeInActor(b -> {
+                    resolveConnection(shard, conn);
+                    return b;
+                }, RESOLVE_RETRY_DURATION);
+                return;
+            }
+
             LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure);
-            conn.poison(new RuntimeRequestException("Failed to resolve shard " + shard, failure));
+            final RequestException cause;
+            if (failure instanceof RequestException) {
+                cause = (RequestException) failure;
+            } else {
+                cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure);
+            }
+
+            conn.poison(cause);
             return;
         }
 
@@ -268,12 +297,15 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
 
     private ConnectingClientConnection<T> createConnection(final Long shard) {
         final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
+        resolveConnection(shard, conn);
+        return conn;
+    }
 
+    private void resolveConnection(final Long shard, final AbstractClientConnection<T> conn) {
+        LOG.debug("{}: resolving shard {} connection {}", persistenceId(), shard, conn);
         resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> {
             backendConnectFinished(shard, conn, backend, failure);
             return behavior;
         }));
-
-        return conn;
     }
 }
index a81cf337acc72f7b383700093be742e6541a7f73..51f96e18fde4e2f0ea66d2ca41d3f72d9ea58755 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import akka.actor.ActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.primitives.UnsignedLong;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -24,8 +24,13 @@ import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,12 +76,10 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
     private static final Logger LOG = LoggerFactory.getLogger(AbstractShardBackendResolver.class);
 
     /**
-     * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
-     * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
-     * non-operational.
+     * Connect request timeout. If the shard does not respond within this interval, we retry the lookup and connection.
      */
     // TODO: maybe make this configurable somehow?
-    private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
+    private static final Timeout CONNECT_TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
 
     private final AtomicLong nextSessionId = new AtomicLong();
     private final Function1<ActorRef, ?> connectFunction;
@@ -96,24 +99,57 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
     protected final ShardState resolveBackendInfo(final String shardName, final long cookie) {
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
 
-        return new ShardState(FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(i -> {
-            LOG.debug("Looking up primary info for {} from {}", shardName, i);
-            return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
-        }).thenApply(response -> {
-            if (response instanceof RequestFailure) {
-                final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
-                LOG.debug("Connect request failed {}", failure, failure.getCause());
-                throw Throwables.propagate(failure.getCause());
+        final CompletableFuture<ShardBackendInfo> future = new CompletableFuture<>();
+        FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> {
+            if (failure == null) {
+                connectShard(shardName, cookie, info, future);
+                return;
             }
 
-            LOG.debug("Resolved backend information to {}", response);
+            LOG.debug("Shard {} failed to resolve", shardName, failure);
+            if (failure instanceof NoShardLeaderException) {
+                // FIXME: this actually is an exception we can retry on
+                future.completeExceptionally(failure);
+            } else if (failure instanceof NotInitializedException) {
+                // FIXME: this actually is an exception we can retry on
+                LOG.info("Shard {} has not initialized yet", shardName);
+                future.completeExceptionally(failure);
+            } else if (failure instanceof PrimaryNotFoundException) {
+                LOG.info("Failed to find primary for shard {}", shardName);
+                future.completeExceptionally(failure);
+            } else {
+                future.completeExceptionally(failure);
+            }
+        });
 
-            Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
-            final ConnectClientSuccess success = (ConnectClientSuccess) response;
+        return new ShardState(future);
+    }
 
-            return new ShardBackendInfo(success.getBackend(),
-                nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
-                success.getDataTree(), success.getMaxMessages());
-        }));
+    private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
+            final CompletableFuture<ShardBackendInfo> future) {
+        LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
+
+        FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
+            .whenComplete((response, failure) -> {
+                if (failure != null) {
+                    LOG.debug("Connect attempt to {} failed", shardName, failure);
+                    future.completeExceptionally(failure);
+                    return;
+                }
+                if (response instanceof RequestFailure) {
+                    final RequestException cause = ((RequestFailure<?, ?>) response).getCause();
+                    LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+                    future.completeExceptionally(cause);
+                    return;
+                }
+
+                LOG.debug("Resolved backend information to {}", response);
+                Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}",
+                    response);
+                final ConnectClientSuccess success = (ConnectClientSuccess) response;
+                future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
+                    success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
+                    success.getMaxMessages()));
+            });
     }
 }
index 05145899b47ecae34adbe98e26ab81da7d541d15..d1c322f86aad61c9642945cd7a3d78987828b80f 100644 (file)
@@ -110,9 +110,8 @@ public class ModuleShardBackendResolverTest {
     public void testGetBackendInfoFail() throws Exception {
         final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
         final ConnectClientRequest req = contextProbe.expectMsgClass(ConnectClientRequest.class);
-        final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException());
-        final ConnectClientFailure response =
-                req.toRequestFailure(cause);
+        final RuntimeException cause = new RuntimeException();
+        final ConnectClientFailure response = req.toRequestFailure(new RuntimeRequestException("fail", cause));
         contextProbe.reply(response);
         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
         final ExecutionException caught =