BUG-2138: Make DistributedShardFactory return Futures.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
index 3c1ae10..63fd4a0 100644 (file)
@@ -16,6 +16,8 @@ import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.cluster.ClusterEvent.MemberExited;
@@ -31,7 +33,10 @@ import akka.cluster.ddata.Replicator;
 import akka.cluster.ddata.Replicator.Changed;
 import akka.cluster.ddata.Replicator.Subscribe;
 import akka.cluster.ddata.Replicator.Update;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
@@ -44,6 +49,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -51,6 +57,8 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
@@ -62,9 +70,17 @@ import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
+import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.compat.java8.FutureConverters;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
@@ -72,9 +88,14 @@ import scala.compat.java8.FutureConverters;
  */
 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
+
     private static final String PERSISTENCE_ID = "sharding-service-actor";
     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
 
+    static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
+    static final int LOOKUP_TASK_MAX_RETRIES = 100;
+
     private final DistributedShardedDOMDataTree shardingService;
     private final ActorSystem actorSystem;
     private final ClusterWrapper clusterWrapper;
@@ -356,6 +377,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                     map -> map.put(cluster, configuration.toDataMapKey(), configuration));
 
         replicator.tell(update, self());
+
+        // schedule a notification task for the reply
+        actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
+                new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
+                        actorContext, shardingService, configuration.getPrefix()),
+                actorSystem.dispatcher());
     }
 
     private void onPrefixShardCreated(final PrefixShardCreated message) {
@@ -391,6 +418,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
                     map -> map.remove(cluster, "prefix=" + message.getPrefix()));
         replicator.tell(removal, self());
+
+        final ShardRemovalLookupTask removalTask =
+                new ShardRemovalLookupTask(actorSystem, getSender(),
+                        actorContext, message.getPrefix());
+
+        actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
     }
 
     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
@@ -448,6 +481,277 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    private abstract static class LookupTask implements Runnable {
+
+        private final ActorRef replyTo;
+        private int retries = 0;
+
+        private LookupTask(final ActorRef replyTo) {
+            this.replyTo = replyTo;
+        }
+
+        abstract void reschedule(int retries);
+
+        void tryReschedule(@Nullable final Throwable throwable) {
+            if (retries <= LOOKUP_TASK_MAX_RETRIES) {
+                retries++;
+                reschedule(retries);
+            } else {
+                fail(throwable);
+            }
+        }
+
+        void fail(@Nullable final Throwable throwable) {
+            if (throwable == null) {
+                replyTo.tell(new Failure(
+                        new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
+                                + "Failing..")), noSender());
+            } else {
+                replyTo.tell(new Failure(
+                        new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
+                                + "Failing..", throwable)), noSender());
+            }
+        }
+    }
+
+    /**
+     * Handles the lookup step of cds shard creation once the configuration is updated.
+     */
+    private static class ShardCreationLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final ClusterWrapper clusterWrapper;
+        private final ActorContext context;
+        private final DistributedShardedDOMDataTree shardingService;
+        private final DOMDataTreeIdentifier toLookup;
+
+        ShardCreationLookupTask(final ActorSystem system,
+                                final ActorRef replyTo,
+                                final ClusterWrapper clusterWrapper,
+                                final ActorContext context,
+                                final DistributedShardedDOMDataTree shardingService,
+                                final DOMDataTreeIdentifier toLookup) {
+            super(replyTo);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.clusterWrapper = clusterWrapper;
+            this.context = context;
+            this.shardingService = shardingService;
+            this.toLookup = toLookup;
+        }
+
+        @Override
+        public void run() {
+            final Future<ActorRef> localShardFuture =
+                    context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
+
+            localShardFuture.onComplete(new OnComplete<ActorRef>() {
+                @Override
+                public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+                    if (throwable != null) {
+                        tryReschedule(throwable);
+                    } else {
+                        LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
+
+                        system.scheduler().scheduleOnce(
+                                SHARD_LOOKUP_TASK_INTERVAL,
+                                new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
+                                        shardingService, toLookup),
+                                system.dispatcher());
+                    }
+                }
+            }, system.dispatcher());
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
+        }
+    }
+
+    /**
+     * Handles the readiness step by waiting for a leader of the created shard.
+     */
+    private static class ShardLeaderLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final ActorContext context;
+        private final ClusterWrapper clusterWrapper;
+        private final ActorRef shard;
+        private final DistributedShardedDOMDataTree shardingService;
+        private final DOMDataTreeIdentifier toLookup;
+
+        ShardLeaderLookupTask(final ActorSystem system,
+                              final ActorRef replyTo,
+                              final ActorContext context,
+                              final ClusterWrapper clusterWrapper,
+                              final ActorRef shard,
+                              final DistributedShardedDOMDataTree shardingService,
+                              final DOMDataTreeIdentifier toLookup) {
+            super(replyTo);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.context = context;
+            this.clusterWrapper = clusterWrapper;
+            this.shard = shard;
+            this.shardingService = shardingService;
+            this.toLookup = toLookup;
+        }
+
+        @Override
+        public void run() {
+
+            final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
+
+            ask.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
+                    if (throwable != null) {
+                        tryReschedule(throwable);
+                    } else {
+                        final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
+                        final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
+                        if (leaderActor.isPresent()) {
+                            // leader is found, backend seems ready, check if the frontend is ready
+                            LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
+                                    clusterWrapper.getCurrentMemberName(), toLookup);
+                            system.scheduler().scheduleOnce(
+                                    SHARD_LOOKUP_TASK_INTERVAL,
+                                    new FrontendLookupTask(system, replyTo, shardingService, toLookup),
+                                    system.dispatcher());
+                        } else {
+                            tryReschedule(null);
+                        }
+                    }
+                }
+            }, system.dispatcher());
+
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
+                    clusterWrapper.getCurrentMemberName(), toLookup, retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
+        }
+    }
+
+    /**
+     * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
+     * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
+     * case they race), the future for the cds shard creation is completed and the shard is ready for use.
+     */
+    private static final class FrontendLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final DistributedShardedDOMDataTree shardingService;
+        private final DOMDataTreeIdentifier toLookup;
+
+        FrontendLookupTask(final ActorSystem system,
+                           final ActorRef replyTo,
+                           final DistributedShardedDOMDataTree shardingService,
+                           final DOMDataTreeIdentifier toLookup) {
+            super(replyTo);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.shardingService = shardingService;
+            this.toLookup = toLookup;
+        }
+
+        @Override
+        public void run() {
+            final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
+                    shardingService.lookupShardFrontend(toLookup);
+
+            if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
+                replyTo.tell(new Success(null), noSender());
+            } else {
+                tryReschedule(null);
+            }
+        }
+
+        private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
+                                          final DOMDataTreeIdentifier prefix) {
+            if (entry == null) {
+                return false;
+            }
+
+            if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
+                return true;
+            }
+
+            if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
+                return true;
+            }
+
+            return false;
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
+        }
+    }
+
+    /**
+     * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
+     * configuration.
+     */
+    private static class ShardRemovalLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final ActorContext context;
+        private final DOMDataTreeIdentifier toLookup;
+
+        ShardRemovalLookupTask(final ActorSystem system,
+                               final ActorRef replyTo,
+                               final ActorContext context,
+                               final DOMDataTreeIdentifier toLookup) {
+            super(replyTo);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.context = context;
+            this.toLookup = toLookup;
+        }
+
+        @Override
+        public void run() {
+            final Future<ActorRef> localShardFuture =
+                    context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
+
+            localShardFuture.onComplete(new OnComplete<ActorRef>() {
+                @Override
+                public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+                    if (throwable != null) {
+                        //TODO Shouldn't we check why findLocalShard failed?
+                        LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
+                                toLookup);
+                        replyTo.tell(new Success(null), noSender());
+                    } else {
+                        tryReschedule(null);
+                    }
+                }
+            }, system.dispatcher());
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
+                    toLookup, retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
+        }
+    }
+
     public static class ShardedDataTreeActorCreator {
 
         private DistributedShardedDOMDataTree shardingService;
@@ -515,6 +819,5 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
             verify();
             return Props.create(ShardedDataTreeActor.class, this);
         }
-
     }
 }