BUG-2138: Make DistributedShardFactory return Futures. 69/48869/41
authorTomas Cere <tcere@cisco.com>
Wed, 30 Nov 2016 13:13:50 +0000 (14:13 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 13 Mar 2017 12:51:17 +0000 (12:51 +0000)
Since the cds shard creation runs asynchronously having Futures in the api
is more appropriate.

Change-Id: Iac5cb4827c6c125fd76074ea0411b13ac881f58d
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java

index 9085027c50894425a7099993500327f45718a665..05968725ccb221cd7b62b0ea38b9b66ea9e52baa 100644 (file)
@@ -389,7 +389,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                                   final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
         LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
 
-        addedConfigs.forEach(id -> doCreatePrefixedShard(configs.get(id)));
+        addedConfigs.stream().filter(identifier
+            -> identifier
+            .getDatastoreType().equals(
+                    ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType())))
+            .forEach(id -> doCreatePrefixedShard(configs.get(id)));
     }
 
     private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
@@ -581,6 +585,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 config.getPrefix());
         final String shardName = shardId.getShardName();
 
+        if (localShards.containsKey(shardName)) {
+            LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+            final PrefixShardConfiguration existing =
+                    configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
+
+            if (existing != null && existing.equals(config)) {
+                // we don't have to do nothing here
+                return;
+            }
+        }
+
         configuration.addPrefixShardConfiguration(config);
 
         final Builder builder = newShardDatastoreContextBuilder(shardName);
index 4b60f617a23feb249b7d8d80bd6ccca4be9dd296..607e78c9d4e2a857651adca5e9dac4888c4a884a 100644 (file)
@@ -15,6 +15,7 @@ import java.util.Map;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -27,7 +28,10 @@ public class ClusterUtils {
 
     // key for replicated configuration key
     public static final Key<ORMap<PrefixShardConfiguration>> CONFIGURATION_KEY =
-            ORMapKey.create("prefix-shard-configuration");
+            ORMapKey.create("prefix-shard-configuration-config");
+
+    public static final Key<ORMap<PrefixShardConfiguration>> OPERATIONAL_KEY =
+            ORMapKey.create("prefix-shard-configuration-oper");
 
     public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) {
         final String type;
@@ -76,4 +80,17 @@ public class ClusterUtils {
         });
         return builder.toString();
     }
+
+    public static Key<ORMap<PrefixShardConfiguration>> getReplicatorKey(LogicalDatastoreType type) {
+        if (LogicalDatastoreType.CONFIGURATION.equals(type)) {
+            return CONFIGURATION_KEY;
+        } else {
+            return OPERATIONAL_KEY;
+        }
+    }
+
+    public static org.opendaylight.mdsal.common.api.LogicalDatastoreType toMDSalApi(
+            final LogicalDatastoreType logicalDatastoreType) {
+        return org.opendaylight.mdsal.common.api.LogicalDatastoreType.valueOf(logicalDatastoreType.name());
+    }
 }
index 6271bf576679df006c5a450b297fc8239618d158..f1cdcd8dda9476c65e9637968fe11480b0ae4dfe 100644 (file)
@@ -10,11 +10,10 @@ package org.opendaylight.controller.cluster.sharding;
 
 import com.google.common.annotations.Beta;
 import java.util.Collection;
+import java.util.concurrent.CompletionStage;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
-import org.opendaylight.yangtools.concepts.Registration;
 
 /**
  * A factory that handles addition of new clustered shard's based on a prefix. This factory is a QoL class that handles
@@ -31,18 +30,25 @@ public interface DistributedShardFactory {
      * @param prefix         Shard root
      * @param replicaMembers Members that this shard is replicated on, has to have at least one Member even if the shard
      *                       should not be replicated.
-     * @return ShardRegistration that should be closed if the shard should be destroyed
-     * @throws DOMDataTreeShardingConflictException If the prefix already has a shard registered
-     * @throws DOMDataTreeProducerException in case there is a problem closing the initial producer that is used to
-     *                                      register the shard into the ShardingService
+     * @return A future that will be completed with a DistributedShardRegistration once the backend and frontend shards
+     *         are spawned.
+     * @throws DOMDataTreeShardingConflictException If the initial check for a conflict on the local node fails, the
+     *         sharding configuration won't be updated if this exception is thrown.
      */
-    DistributedShardRegistration createDistributedShard(DOMDataTreeIdentifier prefix,
-                                                        Collection<MemberName> replicaMembers)
-            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException,
-            DOMDataTreeShardCreationFailedException;
+    CompletionStage<DistributedShardRegistration>
+        createDistributedShard(DOMDataTreeIdentifier prefix, Collection<MemberName> replicaMembers)
+            throws DOMDataTreeShardingConflictException;
 
-    interface DistributedShardRegistration extends Registration {
-        @Override
-        void close();
+    /**
+     * Registration of the CDS shard that allows you to remove the shard from the system by closing the registration.
+     * This removal is done asynchronously.
+     */
+    interface DistributedShardRegistration {
+
+        /**
+         *  Removes the shard from the system, this removal is done asynchronously, the future completes once the
+         *  backend shard is no longer present.
+         */
+        CompletionStage<Void> close();
     }
 }
\ No newline at end of file
index 0bb6aac36fc86b6285b06ca227592315fb178d66..9c5f5b21df73df9ef89b914401630586bd14bab5 100644 (file)
@@ -16,6 +16,8 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.cluster.Member;
+import akka.dispatch.Mapper;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -29,6 +31,8 @@ import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
@@ -64,6 +68,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
+import scala.compat.java8.FutureConverters;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
@@ -74,10 +81,13 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
 
-    private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
     private static final int MAX_ACTOR_CREATION_RETRIES = 100;
     private static final int ACTOR_RETRY_DELAY = 100;
     private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
+    static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration(
+                    ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3,
+                    TimeUnit.SECONDS);
+    static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
 
     static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
 
@@ -118,14 +128,14 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         try {
             defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
                     initDefaultShard(LogicalDatastoreType.CONFIGURATION));
-        } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
+        } catch (final InterruptedException | ExecutionException e) {
             LOG.error("Unable to create default shard frontend for config shard", e);
         }
 
         try {
             defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
                     initDefaultShard(LogicalDatastoreType.OPERATIONAL));
-        } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
+        } catch (final InterruptedException | ExecutionException e) {
             LOG.error("Unable to create default shard frontend for operational shard", e);
         }
     }
@@ -163,11 +173,9 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     }
 
     @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    //TODO: it would be better to block here until the message is processed by the actor
-    public DistributedShardRegistration createDistributedShard(
+    public CompletionStage<DistributedShardRegistration> createDistributedShard(
             final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
-            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException {
+            throws DOMDataTreeShardingConflictException {
         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
                 shards.lookup(prefix);
         if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
@@ -175,10 +183,27 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
                     "Prefix " + prefix + " is already occupied by another shard.");
         }
 
-        PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
-        shardedDataTreeActor.tell(new CreatePrefixShard(config), noSender());
-
-        return new DistributedShardRegistrationImpl(prefix, shardedDataTreeActor, this);
+        final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
+
+        final Future<Object> ask =
+                Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT);
+
+        final Future<DistributedShardRegistration> shardRegistrationFuture = ask.transform(
+                new Mapper<Object, DistributedShardRegistration>() {
+                    @Override
+                    public DistributedShardRegistration apply(final Object parameter) {
+                        return new DistributedShardRegistrationImpl(
+                                prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
+                    }
+                },
+                new Mapper<Throwable, Throwable>() {
+                    @Override
+                    public Throwable apply(final Throwable throwable) {
+                        return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable);
+                    }
+                }, actorSystem.dispatcher());
+
+        return FutureConverters.toJava(shardRegistrationFuture);
     }
 
     void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
@@ -248,6 +273,12 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         shards.remove(prefix);
     }
 
+    DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
+            final DOMDataTreeIdentifier prefix) {
+        return shards.lookup(prefix);
+
+    }
+
     DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
         return shardedDOMDataTree.createProducer(prefix);
     }
@@ -287,14 +318,23 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     }
 
     private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
-            throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException {
+            throws ExecutionException, InterruptedException {
         final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
                 Cluster.get(actorSystem).state().members()).asJavaCollection();
         final Collection<MemberName> names = Collections2.transform(members,
             m -> MemberName.forName(m.roles().iterator().next()));
 
-        return createDistributedShard(
-                new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names);
+        try {
+            // we should probably only have one node create the default shards
+            return createDistributedShard(
+                    new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)
+                    .toCompletableFuture().get();
+        } catch (DOMDataTreeShardingConflictException e) {
+            LOG.debug("Default shard already registered, possibly due to other node doing it faster");
+            return new DistributedShardRegistrationImpl(
+                    new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
+                    shardedDataTreeActor, this);
+        }
     }
 
     private static void closeProducer(final DOMDataTreeProducer producer) {
@@ -326,7 +366,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
     }
 
-    private static class DistributedShardRegistrationImpl implements DistributedShardRegistration {
+    private class DistributedShardRegistrationImpl implements DistributedShardRegistration {
 
         private final DOMDataTreeIdentifier prefix;
         private final ActorRef shardedDataTreeActor;
@@ -341,11 +381,28 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         }
 
         @Override
-        public void close() {
+        public CompletionStage<Void> close() {
             // first despawn on the local node
             distributedShardedDOMDataTree.despawnShardFrontend(prefix);
             // update the config so the remote nodes are updated
-            shardedDataTreeActor.tell(new RemovePrefixShard(prefix), noSender());
+            final Future<Object> ask =
+                    Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
+
+            final Future<Void> closeFuture = ask.transform(
+                    new Mapper<Object, Void>() {
+                        @Override
+                        public Void apply(Object parameter) {
+                            return null;
+                        }
+                    },
+                    new Mapper<Throwable, Throwable>() {
+                        @Override
+                        public Throwable apply(Throwable throwable) {
+                            return throwable;
+                        }
+                    }, actorSystem.dispatcher());
+
+            return FutureConverters.toJava(closeFuture);
         }
     }
 
index 3c1ae1069e3a656765ba834236fdbf564c3016a1..63fd4a0867ddb340fd55658e01c3e29e515fd163 100644 (file)
@@ -16,6 +16,8 @@ import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.cluster.ClusterEvent.MemberExited;
@@ -31,7 +33,10 @@ import akka.cluster.ddata.Replicator;
 import akka.cluster.ddata.Replicator.Changed;
 import akka.cluster.ddata.Replicator.Subscribe;
 import akka.cluster.ddata.Replicator.Update;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
@@ -44,6 +49,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -51,6 +57,8 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
@@ -62,9 +70,17 @@ import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
+import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.compat.java8.FutureConverters;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
@@ -72,9 +88,14 @@ import scala.compat.java8.FutureConverters;
  */
 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
+
     private static final String PERSISTENCE_ID = "sharding-service-actor";
     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
 
+    static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
+    static final int LOOKUP_TASK_MAX_RETRIES = 100;
+
     private final DistributedShardedDOMDataTree shardingService;
     private final ActorSystem actorSystem;
     private final ClusterWrapper clusterWrapper;
@@ -356,6 +377,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                     map -> map.put(cluster, configuration.toDataMapKey(), configuration));
 
         replicator.tell(update, self());
+
+        // schedule a notification task for the reply
+        actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
+                new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
+                        actorContext, shardingService, configuration.getPrefix()),
+                actorSystem.dispatcher());
     }
 
     private void onPrefixShardCreated(final PrefixShardCreated message) {
@@ -391,6 +418,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
                     map -> map.remove(cluster, "prefix=" + message.getPrefix()));
         replicator.tell(removal, self());
+
+        final ShardRemovalLookupTask removalTask =
+                new ShardRemovalLookupTask(actorSystem, getSender(),
+                        actorContext, message.getPrefix());
+
+        actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
     }
 
     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
@@ -448,6 +481,277 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    private abstract static class LookupTask implements Runnable {
+
+        private final ActorRef replyTo;
+        private int retries = 0;
+
+        private LookupTask(final ActorRef replyTo) {
+            this.replyTo = replyTo;
+        }
+
+        abstract void reschedule(int retries);
+
+        void tryReschedule(@Nullable final Throwable throwable) {
+            if (retries <= LOOKUP_TASK_MAX_RETRIES) {
+                retries++;
+                reschedule(retries);
+            } else {
+                fail(throwable);
+            }
+        }
+
+        void fail(@Nullable final Throwable throwable) {
+            if (throwable == null) {
+                replyTo.tell(new Failure(
+                        new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
+                                + "Failing..")), noSender());
+            } else {
+                replyTo.tell(new Failure(
+                        new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
+                                + "Failing..", throwable)), noSender());
+            }
+        }
+    }
+
+    /**
+     * Handles the lookup step of cds shard creation once the configuration is updated.
+     */
+    private static class ShardCreationLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final ClusterWrapper clusterWrapper;
+        private final ActorContext context;
+        private final DistributedShardedDOMDataTree shardingService;
+        private final DOMDataTreeIdentifier toLookup;
+
+        ShardCreationLookupTask(final ActorSystem system,
+                                final ActorRef replyTo,
+                                final ClusterWrapper clusterWrapper,
+                                final ActorContext context,
+                                final DistributedShardedDOMDataTree shardingService,
+                                final DOMDataTreeIdentifier toLookup) {
+            super(replyTo);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.clusterWrapper = clusterWrapper;
+            this.context = context;
+            this.shardingService = shardingService;
+            this.toLookup = toLookup;
+        }
+
+        @Override
+        public void run() {
+            final Future<ActorRef> localShardFuture =
+                    context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
+
+            localShardFuture.onComplete(new OnComplete<ActorRef>() {
+                @Override
+                public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+                    if (throwable != null) {
+                        tryReschedule(throwable);
+                    } else {
+                        LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
+
+                        system.scheduler().scheduleOnce(
+                                SHARD_LOOKUP_TASK_INTERVAL,
+                                new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
+                                        shardingService, toLookup),
+                                system.dispatcher());
+                    }
+                }
+            }, system.dispatcher());
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
+        }
+    }
+
+    /**
+     * Handles the readiness step by waiting for a leader of the created shard.
+     */
+    private static class ShardLeaderLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final ActorContext context;
+        private final ClusterWrapper clusterWrapper;
+        private final ActorRef shard;
+        private final DistributedShardedDOMDataTree shardingService;
+        private final DOMDataTreeIdentifier toLookup;
+
+        ShardLeaderLookupTask(final ActorSystem system,
+                              final ActorRef replyTo,
+                              final ActorContext context,
+                              final ClusterWrapper clusterWrapper,
+                              final ActorRef shard,
+                              final DistributedShardedDOMDataTree shardingService,
+                              final DOMDataTreeIdentifier toLookup) {
+            super(replyTo);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.context = context;
+            this.clusterWrapper = clusterWrapper;
+            this.shard = shard;
+            this.shardingService = shardingService;
+            this.toLookup = toLookup;
+        }
+
+        @Override
+        public void run() {
+
+            final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
+
+            ask.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
+                    if (throwable != null) {
+                        tryReschedule(throwable);
+                    } else {
+                        final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
+                        final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
+                        if (leaderActor.isPresent()) {
+                            // leader is found, backend seems ready, check if the frontend is ready
+                            LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
+                                    clusterWrapper.getCurrentMemberName(), toLookup);
+                            system.scheduler().scheduleOnce(
+                                    SHARD_LOOKUP_TASK_INTERVAL,
+                                    new FrontendLookupTask(system, replyTo, shardingService, toLookup),
+                                    system.dispatcher());
+                        } else {
+                            tryReschedule(null);
+                        }
+                    }
+                }
+            }, system.dispatcher());
+
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
+                    clusterWrapper.getCurrentMemberName(), toLookup, retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
+        }
+    }
+
+    /**
+     * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
+     * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
+     * case they race), the future for the cds shard creation is completed and the shard is ready for use.
+     */
+    private static final class FrontendLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final DistributedShardedDOMDataTree shardingService;
+        private final DOMDataTreeIdentifier toLookup;
+
+        FrontendLookupTask(final ActorSystem system,
+                           final ActorRef replyTo,
+                           final DistributedShardedDOMDataTree shardingService,
+                           final DOMDataTreeIdentifier toLookup) {
+            super(replyTo);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.shardingService = shardingService;
+            this.toLookup = toLookup;
+        }
+
+        @Override
+        public void run() {
+            final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
+                    shardingService.lookupShardFrontend(toLookup);
+
+            if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
+                replyTo.tell(new Success(null), noSender());
+            } else {
+                tryReschedule(null);
+            }
+        }
+
+        private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
+                                          final DOMDataTreeIdentifier prefix) {
+            if (entry == null) {
+                return false;
+            }
+
+            if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
+                return true;
+            }
+
+            if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
+                return true;
+            }
+
+            return false;
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
+        }
+    }
+
+    /**
+     * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
+     * configuration.
+     */
+    private static class ShardRemovalLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final ActorContext context;
+        private final DOMDataTreeIdentifier toLookup;
+
+        ShardRemovalLookupTask(final ActorSystem system,
+                               final ActorRef replyTo,
+                               final ActorContext context,
+                               final DOMDataTreeIdentifier toLookup) {
+            super(replyTo);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.context = context;
+            this.toLookup = toLookup;
+        }
+
+        @Override
+        public void run() {
+            final Future<ActorRef> localShardFuture =
+                    context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
+
+            localShardFuture.onComplete(new OnComplete<ActorRef>() {
+                @Override
+                public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+                    if (throwable != null) {
+                        //TODO Shouldn't we check why findLocalShard failed?
+                        LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
+                                toLookup);
+                        replyTo.tell(new Success(null), noSender());
+                    } else {
+                        tryReschedule(null);
+                    }
+                }
+            }, system.dispatcher());
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
+                    toLookup, retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
+        }
+    }
+
     public static class ShardedDataTreeActorCreator {
 
         private DistributedShardedDOMDataTree shardingService;
@@ -515,6 +819,5 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
             verify();
             return Props.create(ShardedDataTreeActor.class, this);
         }
-
     }
 }
index 8256edd27230dd4821f0e0ca2d386cce3cfe11bc..c0df6bd0059cf106ec549c83692ef49b20854e05 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
@@ -14,6 +15,9 @@ import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import scala.compat.java8.FutureConverters;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 
 public abstract class AbstractTest {
     protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
@@ -40,4 +44,9 @@ public abstract class AbstractTest {
     protected static LocalHistoryIdentifier nextHistoryId() {
         return new LocalHistoryIdentifier(CLIENT_ID, HISTORY_COUNTER.incrementAndGet());
     }
+
+    protected static <T> T waitOnAsyncTask(final CompletionStage<T> completionStage, final Duration timeout)
+            throws Exception {
+        return Await.result(FutureConverters.toScala(completionStage), timeout);
+    }
 }
index f054100134448ae0816a21922ed9f5e7fb678c62..27c5c49d70ec114bbb5b18414bb85728aa7a885f 100644 (file)
@@ -43,7 +43,6 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -75,11 +74,14 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
 
 
     private final Builder leaderDatastoreContextBuilder =
-            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
+                    .logicalStoreType(
+                            org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
 
     private final DatastoreContext.Builder followerDatastoreContextBuilder =
             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
-                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+                    .logicalStoreType(
+                            org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
 
     private DistributedDataStore followerDistributedDataStore;
     private DistributedDataStore leaderDistributedDataStore;
@@ -155,8 +157,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         leaderTestKit.waitForMembersUp("member-2");
 
         final DistributedShardRegistration shardRegistration =
-                leaderShardFactory.createDistributedShard(
-                        TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+                waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                        TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                        DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
@@ -208,8 +211,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
 
         LOG.warn("registering first shard");
         final DistributedShardRegistration shardRegistration =
-                leaderShardFactory.createDistributedShard(TEST_ID,
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+                waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                        TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                        DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
@@ -259,24 +263,24 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
     public void testMultipleShardRegistrations() throws Exception {
         initEmptyDatastores("config");
 
-        final DistributedShardRegistration reg1 = leaderShardFactory
-                .createDistributedShard(TEST_ID,
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        final DistributedShardRegistration reg2 = leaderShardFactory
-                .createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        final DistributedShardRegistration reg3 = leaderShardFactory
-                .createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        final DistributedShardRegistration reg4 = leaderShardFactory
-                .createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
@@ -356,9 +360,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
 
         for (int i = 0; i < 10; i++) {
             LOG.debug("Round {}", i);
-            final DistributedShardRegistration reg1 = leaderShardFactory
-                    .createDistributedShard(TEST_ID,
-                            Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+            final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                    TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                    DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
             leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
@@ -369,7 +373,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
             assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
 
-            reg1.close();
+            waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
             waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
index cd21a4c19b4beb57de5167758181140af6b5c801..9841ca01250f27ef1ca0ede6fa5a28e5dd12e351 100644 (file)
@@ -51,10 +51,6 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -70,6 +66,7 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -156,49 +153,38 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     public void testWritesIntoDefaultShard() throws Exception {
         initEmptyDatastore("config");
 
-        leaderShardFactory.createDistributedShard(TEST_ID,
-                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
-
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
-                ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
-
         final DOMDataTreeIdentifier configRoot =
                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
 
         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot));
 
         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
-        final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
+        final DOMDataTreeWriteCursor cursor =
+                tx.createCursor(new DOMDataTreeIdentifier(
+                        LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY));
         Assert.assertNotNull(cursor);
+
+        final ContainerNode test =
+                ImmutableContainerNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)).build();
+
+        cursor.write(test.getIdentifier(), test);
+        cursor.close();
+
+        tx.submit().checkedGet();
     }
 
     @Test
     public void testSingleNodeWrites() throws Exception {
         initEmptyDatastore("config");
 
-        leaderShardFactory.createDistributedShard(TEST_ID,
-                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
+                leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        final DistributedShardRegistration shardRegistration =
-                leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
 
-        LOG.warn("Got after waiting for nonleader");
-        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
-
-        new JavaTestKit(leaderSystem) {
-            {
-                leaderShardManager.tell(
-                        new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
-                leaderDistributedDataStore.getActorContext().getShardManager().tell(
-                        new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
-            }
-        };
-
         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
 
         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
@@ -212,7 +198,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
                 valueToCheck);
 
         cursor.close();
-        LOG.warn("Got to pre submit");
+        LOG.debug("Got to pre submit");
 
         tx.submit().checkedGet();
 
@@ -241,8 +227,10 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     public void testMultipleWritesIntoSingleMapEntry() throws Exception {
         initEmptyDatastore("config");
 
-        final DistributedShardRegistration shardRegistration =
-                leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
+        final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
+                leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
 
@@ -255,8 +243,9 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         final YangInstanceIdentifier oid1 = getOuterListIdFor(0);
         final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1);
 
-        final DistributedShardRegistration outerListShardReg = leaderShardFactory.createDistributedShard(outerListPath,
-                Lists.newArrayList(AbstractTest.MEMBER_NAME));
+        final DistributedShardRegistration outerListShardReg = waitOnAsyncTask(
+                leaderShardFactory.createDistributedShard(outerListPath, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(outerListPath.getRootIdentifier()));
@@ -336,16 +325,27 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     public void testDistributedData() throws Exception {
         initEmptyDatastore("config");
 
-        leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
-        leaderShardFactory.createDistributedShard(
-                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
-                Lists.newArrayList(AbstractTest.MEMBER_NAME));
-        leaderShardFactory.createDistributedShard(
-                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
-                Lists.newArrayList(AbstractTest.MEMBER_NAME));
-        leaderShardFactory.createDistributedShard(
-                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
-                Lists.newArrayList(AbstractTest.MEMBER_NAME));
+        waitOnAsyncTask(
+                leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
+        waitOnAsyncTask(
+                leaderShardFactory.createDistributedShard(
+                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
+        waitOnAsyncTask(
+                leaderShardFactory.createDistributedShard(
+                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
+        waitOnAsyncTask(
+                leaderShardFactory.createDistributedShard(
+                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
@@ -364,9 +364,9 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
         for (int i = 0; i < 10; i++) {
             LOG.debug("Round {}", i);
-            final DistributedShardRegistration reg1 = leaderShardFactory
-                    .createDistributedShard(TEST_ID,
-                            Lists.newArrayList(AbstractTest.MEMBER_NAME));
+            final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                    TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+                    DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
             leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
@@ -374,11 +374,10 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
             assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
 
-            reg1.close();
+            waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
             waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-
         }
     }