BUG-5280: add SimpleDataStoreClientBehavior
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolver.java
index 9e6485b296e0e87dc20febe7db12ab08991d4db3..dde2292241ef55344e64bbd2793a03038c13bfa3 100644 (file)
@@ -7,39 +7,22 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.actor.ActorRef;
-import akka.util.Timeout;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableBiMap.Builder;
-import com.google.common.primitives.UnsignedLong;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
-import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
-import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
-import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.compat.java8.FutureConverters;
 
 /**
  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
@@ -48,51 +31,11 @@ import scala.compat.java8.FutureConverters;
  *
  * @author Robert Varga
  */
-@SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
-                    justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended")
 @ThreadSafe
-final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
-    private static final class Entry {
-        private final CompletionStage<ShardBackendInfo> stage;
-        @GuardedBy("this")
-        private ShardBackendInfo result;
-
-        Entry(final CompletionStage<ShardBackendInfo> stage) {
-            this.stage = Preconditions.checkNotNull(stage);
-            stage.whenComplete(this::onStageResolved);
-        }
-
-        @Nonnull CompletionStage<ShardBackendInfo> getStage() {
-            return stage;
-        }
-
-        synchronized @Nullable ShardBackendInfo getResult() {
-            return result;
-        }
-
-        private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
-            if (failure == null) {
-                this.result = Preconditions.checkNotNull(result);
-            } else {
-                LOG.warn("Failed to resolve shard", failure);
-            }
-        }
-    }
-
-    private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
+final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
 
-    /**
-     * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
-     * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
-     * non-operational.
-     */
-    // TODO: maybe make this configurable somehow?
-    private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
-
-    private final ConcurrentMap<Long, Entry> backends = new ConcurrentHashMap<>();
-    private final AtomicLong nextSessionId = new AtomicLong();
-    private final Function1<ActorRef, ?> connectFunction;
+    private final ConcurrentMap<Long, ShardState> backends = new ConcurrentHashMap<>();
     private final ActorContext actorContext;
 
     @GuardedBy("this")
@@ -102,9 +45,8 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
 
     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
     ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
+        super(clientId, actorContext);
         this.actorContext = Preconditions.checkNotNull(actorContext);
-        this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
-            ABIVersion.current()));
     }
 
     Long resolveShardForPath(final YangInstanceIdentifier path) {
@@ -127,54 +69,36 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
         return cookie;
     }
 
-    private CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+    private ShardState resolveBackendInfo(final Long cookie) {
         final String shardName = shards.inverse().get(cookie);
         if (shardName == null) {
             LOG.warn("Failing request for non-existent cookie {}", cookie);
-            return NULL_FUTURE;
+            return null;
         }
 
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
 
-        return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
-            LOG.debug("Looking up primary info for {} from {}", shardName, info);
-            return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
-        }).thenApply(response -> {
-            if (response instanceof RequestFailure) {
-                final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
-                LOG.debug("Connect request failed {}", failure, failure.getCause());
-                throw Throwables.propagate(failure.getCause());
-            }
-
-            LOG.debug("Resolved backend information to {}", response);
-
-            Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
-            final ConnectClientSuccess success = (ConnectClientSuccess) response;
-
-            return new ShardBackendInfo(success.getBackend(),
-                nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
-                success.getDataTree(), success.getMaxMessages());
-        });
+        return resolveBackendInfo(shardName, cookie);
     }
 
     @Override
-    public CompletionStage<? extends ShardBackendInfo> getBackendInfo(final Long cookie) {
-        return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage();
+    public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+        return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage();
     }
 
     @Override
-    public CompletionStage<? extends ShardBackendInfo> refreshBackendInfo(final Long cookie,
+    public CompletionStage<ShardBackendInfo> refreshBackendInfo(final Long cookie,
             final ShardBackendInfo staleInfo) {
-        final Entry existing = backends.get(cookie);
+        final ShardState existing = backends.get(cookie);
         if (existing != null) {
             if (!staleInfo.equals(existing.getResult())) {
                 return existing.getStage();
             }
 
             LOG.debug("Invalidating backend information {}", staleInfo);
-            actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName());
+            flushCache(staleInfo.getShardName());
 
-            LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo);
+            LOG.trace("Invalidated cache %s", staleInfo);
             backends.remove(cookie, existing);
         }