Do not assert seal transition on forward path
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractShardBackendResolver.java
index 93cf7931e56139523146e5c36932a2eba626328e..d2b10c1b7067a038e90d910b16a0749ff4b76212 100644 (file)
@@ -11,10 +11,14 @@ import akka.actor.ActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -23,6 +27,7 @@ import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
@@ -31,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function1;
@@ -63,9 +69,9 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
             return result;
         }
 
-        private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
+        private synchronized void onStageResolved(final ShardBackendInfo info, final Throwable failure) {
             if (failure == null) {
-                this.result = Preconditions.checkNotNull(result);
+                this.result = Preconditions.checkNotNull(info);
             } else {
                 LOG.warn("Failed to resolve shard", failure);
             }
@@ -83,6 +89,7 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
     private final AtomicLong nextSessionId = new AtomicLong();
     private final Function1<ActorRef, ?> connectFunction;
     private final ActorContext actorContext;
+    private final Set<Consumer<Long>> staleBackendInfoCallbacks = ConcurrentHashMap.newKeySet();
 
     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
     AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
@@ -91,6 +98,20 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
             ABIVersion.current()));
     }
 
+    @Override
+    public Registration notifyWhenBackendInfoIsStale(final Consumer<Long> callback) {
+        staleBackendInfoCallbacks.add(callback);
+        return () -> staleBackendInfoCallbacks.remove(callback);
+    }
+
+    protected void notifyStaleBackendInfoCallbacks(Long cookie) {
+        staleBackendInfoCallbacks.forEach(callback -> callback.accept(cookie));
+    }
+
+    protected ActorContext actorContext() {
+        return actorContext;
+    }
+
     protected final void flushCache(final String shardName) {
         actorContext.getPrimaryShardInfoCache().remove(shardName);
     }
@@ -107,8 +128,7 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
 
             LOG.debug("Shard {} failed to resolve", shardName, failure);
             if (failure instanceof NoShardLeaderException) {
-                // FIXME: this actually is an exception we can retry on
-                future.completeExceptionally(failure);
+                future.completeExceptionally(wrap("Shard has no current leader", failure));
             } else if (failure instanceof NotInitializedException) {
                 // FIXME: this actually is an exception we can retry on
                 LOG.info("Shard {} has not initialized yet", shardName);
@@ -124,31 +144,42 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
         return new ShardState(future);
     }
 
+    private static TimeoutException wrap(final String message, final Throwable cause) {
+        final TimeoutException ret = new TimeoutException(message);
+        ret.initCause(Preconditions.checkNotNull(cause));
+        return ret;
+    }
+
     private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
             final CompletableFuture<ShardBackendInfo> future) {
         LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
 
         FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
-            .whenComplete((response, failure) -> {
-                if (failure != null) {
-                    LOG.debug("Connect attempt to {} failed", shardName, failure);
-                    future.completeExceptionally(failure);
-                    return;
-                }
-                if (response instanceof RequestFailure) {
-                    final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
-                    LOG.debug("Connect attempt to {} failed to process", shardName, cause);
-                    future.completeExceptionally(cause);
-                    return;
-                }
-
-                LOG.debug("Resolved backend information to {}", response);
-                Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}",
-                    response);
-                final ConnectClientSuccess success = (ConnectClientSuccess) response;
-                future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
-                    success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
-                    success.getMaxMessages()));
-            });
+            .whenComplete((response, failure) -> onConnectResponse(shardName, cookie, future, response, failure));
+    }
+
+    private void onConnectResponse(final String shardName, final long cookie,
+            final CompletableFuture<ShardBackendInfo> future, final Object response, final Throwable failure) {
+        if (failure != null) {
+            LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
+            future.completeExceptionally(wrap("Connection attempt failed", failure));
+            return;
+        }
+        if (response instanceof RequestFailure) {
+            final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
+            LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+            final Throwable result = cause instanceof NotLeaderException
+                    ? wrap("Leader moved during establishment", cause) : cause;
+            future.completeExceptionally(result);
+            return;
+        }
+
+        LOG.debug("Resolved backend information to {}", response);
+        Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s",
+            response);
+        final ConnectClientSuccess success = (ConnectClientSuccess) response;
+        future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
+            success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
+            success.getMaxMessages()));
     }
 }