BUG-8515: make sure we retry connection on NotLeaderException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractShardBackendResolver.java
index 51f96e18fde4e2f0ea66d2ca41d3f72d9ea58755..a1ddcc344953f991b34450d34afb5615bc6c4149 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.primitives.UnsignedLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -23,8 +24,8 @@ import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@@ -108,8 +109,7 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
 
             LOG.debug("Shard {} failed to resolve", shardName, failure);
             if (failure instanceof NoShardLeaderException) {
-                // FIXME: this actually is an exception we can retry on
-                future.completeExceptionally(failure);
+                future.completeExceptionally(wrap("Shard has no current leader", failure));
             } else if (failure instanceof NotInitializedException) {
                 // FIXME: this actually is an exception we can retry on
                 LOG.info("Shard {} has not initialized yet", shardName);
@@ -125,6 +125,12 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
         return new ShardState(future);
     }
 
+    private static TimeoutException wrap(final String message, final Throwable cause) {
+        final TimeoutException ret = new TimeoutException(message);
+        ret.initCause(Preconditions.checkNotNull(cause));
+        return ret;
+    }
+
     private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
             final CompletableFuture<ShardBackendInfo> future) {
         LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
@@ -132,14 +138,16 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
         FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
             .whenComplete((response, failure) -> {
                 if (failure != null) {
-                    LOG.debug("Connect attempt to {} failed", shardName, failure);
-                    future.completeExceptionally(failure);
+                    LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
+                    future.completeExceptionally(wrap("Connection attempt failed", failure));
                     return;
                 }
                 if (response instanceof RequestFailure) {
-                    final RequestException cause = ((RequestFailure<?, ?>) response).getCause();
+                    final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
                     LOG.debug("Connect attempt to {} failed to process", shardName, cause);
-                    future.completeExceptionally(cause);
+                    final Throwable result = cause instanceof NotLeaderException
+                            ? wrap("Leader moved during establishment", cause) : cause;
+                    future.completeExceptionally(result);
                     return;
                 }