From 7204c455a1636a7fc89bcd28fe9e9000eaa81b3b Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Wed, 30 Nov 2016 14:13:50 +0100 Subject: [PATCH] BUG-2138: Make DistributedShardFactory return Futures. Since the cds shard creation runs asynchronously having Futures in the api is more appropriate. Change-Id: Iac5cb4827c6c125fd76074ea0411b13ac881f58d Signed-off-by: Tomas Cere --- .../datastore/shardmanager/ShardManager.java | 17 +- .../cluster/datastore/utils/ClusterUtils.java | 19 +- .../sharding/DistributedShardFactory.java | 32 +- .../DistributedShardedDOMDataTree.java | 91 +++++- .../sharding/ShardedDataTreeActor.java | 305 +++++++++++++++++- .../cluster/datastore/AbstractTest.java | 9 + ...ributedShardedDOMDataTreeRemotingTest.java | 56 ++-- .../DistributedShardedDOMDataTreeTest.java | 99 +++--- 8 files changed, 519 insertions(+), 109 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 9085027c50..05968725cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -389,7 +389,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Map configs) { LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs); - addedConfigs.forEach(id -> doCreatePrefixedShard(configs.get(id))); + addedConfigs.stream().filter(identifier + -> identifier + .getDatastoreType().equals( + ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType()))) + .forEach(id -> doCreatePrefixedShard(configs.get(id))); } private void resolveUpdates(Set maybeUpdatedConfigs) { @@ -581,6 +585,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { config.getPrefix()); final String shardName = shardId.getShardName(); + if (localShards.containsKey(shardName)) { + LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); + final PrefixShardConfiguration existing = + configuration.getAllPrefixShardConfigurations().get(config.getPrefix()); + + if (existing != null && existing.equals(config)) { + // we don't have to do nothing here + return; + } + } + configuration.addPrefixShardConfiguration(config); final Builder builder = newShardDatastoreContextBuilder(shardName); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java index 4b60f617a2..607e78c9d4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java @@ -15,6 +15,7 @@ import java.util.Map; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -27,7 +28,10 @@ public class ClusterUtils { // key for replicated configuration key public static final Key> CONFIGURATION_KEY = - ORMapKey.create("prefix-shard-configuration"); + ORMapKey.create("prefix-shard-configuration-config"); + + public static final Key> OPERATIONAL_KEY = + ORMapKey.create("prefix-shard-configuration-oper"); public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) { final String type; @@ -76,4 +80,17 @@ public class ClusterUtils { }); return builder.toString(); } + + public static Key> getReplicatorKey(LogicalDatastoreType type) { + if (LogicalDatastoreType.CONFIGURATION.equals(type)) { + return CONFIGURATION_KEY; + } else { + return OPERATIONAL_KEY; + } + } + + public static org.opendaylight.mdsal.common.api.LogicalDatastoreType toMDSalApi( + final LogicalDatastoreType logicalDatastoreType) { + return org.opendaylight.mdsal.common.api.LogicalDatastoreType.valueOf(logicalDatastoreType.name()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java index 6271bf5766..f1cdcd8dda 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java @@ -10,11 +10,10 @@ package org.opendaylight.controller.cluster.sharding; import com.google.common.annotations.Beta; import java.util.Collection; +import java.util.concurrent.CompletionStage; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; -import org.opendaylight.yangtools.concepts.Registration; /** * A factory that handles addition of new clustered shard's based on a prefix. This factory is a QoL class that handles @@ -31,18 +30,25 @@ public interface DistributedShardFactory { * @param prefix Shard root * @param replicaMembers Members that this shard is replicated on, has to have at least one Member even if the shard * should not be replicated. - * @return ShardRegistration that should be closed if the shard should be destroyed - * @throws DOMDataTreeShardingConflictException If the prefix already has a shard registered - * @throws DOMDataTreeProducerException in case there is a problem closing the initial producer that is used to - * register the shard into the ShardingService + * @return A future that will be completed with a DistributedShardRegistration once the backend and frontend shards + * are spawned. + * @throws DOMDataTreeShardingConflictException If the initial check for a conflict on the local node fails, the + * sharding configuration won't be updated if this exception is thrown. */ - DistributedShardRegistration createDistributedShard(DOMDataTreeIdentifier prefix, - Collection replicaMembers) - throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException, - DOMDataTreeShardCreationFailedException; + CompletionStage + createDistributedShard(DOMDataTreeIdentifier prefix, Collection replicaMembers) + throws DOMDataTreeShardingConflictException; - interface DistributedShardRegistration extends Registration { - @Override - void close(); + /** + * Registration of the CDS shard that allows you to remove the shard from the system by closing the registration. + * This removal is done asynchronously. + */ + interface DistributedShardRegistration { + + /** + * Removes the shard from the system, this removal is done asynchronously, the future completes once the + * backend shard is no longer present. + */ + CompletionStage close(); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java index 0bb6aac36f..9c5f5b21df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java @@ -16,6 +16,8 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.cluster.Cluster; import akka.cluster.Member; +import akka.dispatch.Mapper; +import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -29,6 +31,8 @@ import java.util.Collections; import java.util.EnumMap; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.ActorSystemProvider; @@ -64,6 +68,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; +import scala.compat.java8.FutureConverters; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via @@ -74,10 +81,13 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class); - private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); private static final int MAX_ACTOR_CREATION_RETRIES = 100; private static final int ACTOR_RETRY_DELAY = 100; private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS; + static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration( + ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3, + TimeUnit.SECONDS); + static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION); static final String ACTOR_ID = "ShardedDOMDataTreeFrontend"; @@ -118,14 +128,14 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat try { defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION, initDefaultShard(LogicalDatastoreType.CONFIGURATION)); - } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) { + } catch (final InterruptedException | ExecutionException e) { LOG.error("Unable to create default shard frontend for config shard", e); } try { defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL, initDefaultShard(LogicalDatastoreType.OPERATIONAL)); - } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) { + } catch (final InterruptedException | ExecutionException e) { LOG.error("Unable to create default shard frontend for operational shard", e); } } @@ -163,11 +173,9 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } @Override - @SuppressWarnings("checkstyle:IllegalCatch") - //TODO: it would be better to block here until the message is processed by the actor - public DistributedShardRegistration createDistributedShard( + public CompletionStage createDistributedShard( final DOMDataTreeIdentifier prefix, final Collection replicaMembers) - throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException { + throws DOMDataTreeShardingConflictException { final DOMDataTreePrefixTableEntry> lookup = shards.lookup(prefix); if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) { @@ -175,10 +183,27 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat "Prefix " + prefix + " is already occupied by another shard."); } - PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers); - shardedDataTreeActor.tell(new CreatePrefixShard(config), noSender()); - - return new DistributedShardRegistrationImpl(prefix, shardedDataTreeActor, this); + final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers); + + final Future ask = + Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT); + + final Future shardRegistrationFuture = ask.transform( + new Mapper() { + @Override + public DistributedShardRegistration apply(final Object parameter) { + return new DistributedShardRegistrationImpl( + prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this); + } + }, + new Mapper() { + @Override + public Throwable apply(final Throwable throwable) { + return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable); + } + }, actorSystem.dispatcher()); + + return FutureConverters.toJava(shardRegistrationFuture); } void resolveShardAdditions(final Set additions) { @@ -248,6 +273,12 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat shards.remove(prefix); } + DOMDataTreePrefixTableEntry> lookupShardFrontend( + final DOMDataTreeIdentifier prefix) { + return shards.lookup(prefix); + + } + DOMDataTreeProducer localCreateProducer(final Collection prefix) { return shardedDOMDataTree.createProducer(prefix); } @@ -287,14 +318,23 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType) - throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException { + throws ExecutionException, InterruptedException { final Collection members = JavaConverters.asJavaCollectionConverter( Cluster.get(actorSystem).state().members()).asJavaCollection(); final Collection names = Collections2.transform(members, m -> MemberName.forName(m.roles().iterator().next())); - return createDistributedShard( - new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names); + try { + // we should probably only have one node create the default shards + return createDistributedShard( + new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names) + .toCompletableFuture().get(); + } catch (DOMDataTreeShardingConflictException e) { + LOG.debug("Default shard already registered, possibly due to other node doing it faster"); + return new DistributedShardRegistrationImpl( + new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), + shardedDataTreeActor, this); + } } private static void closeProducer(final DOMDataTreeProducer producer) { @@ -326,7 +366,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException); } - private static class DistributedShardRegistrationImpl implements DistributedShardRegistration { + private class DistributedShardRegistrationImpl implements DistributedShardRegistration { private final DOMDataTreeIdentifier prefix; private final ActorRef shardedDataTreeActor; @@ -341,11 +381,28 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } @Override - public void close() { + public CompletionStage close() { // first despawn on the local node distributedShardedDOMDataTree.despawnShardFrontend(prefix); // update the config so the remote nodes are updated - shardedDataTreeActor.tell(new RemovePrefixShard(prefix), noSender()); + final Future ask = + Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT); + + final Future closeFuture = ask.transform( + new Mapper() { + @Override + public Void apply(Object parameter) { + return null; + } + }, + new Mapper() { + @Override + public Throwable apply(Throwable throwable) { + return throwable; + } + }, actorSystem.dispatcher()); + + return FutureConverters.toJava(closeFuture); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index 3c1ae1069e..63fd4a0867 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -16,6 +16,8 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; @@ -31,7 +33,10 @@ import akka.cluster.ddata.Replicator; import akka.cluster.ddata.Replicator.Changed; import akka.cluster.ddata.Replicator.Subscribe; import akka.cluster.ddata.Replicator.Update; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; @@ -44,6 +49,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -51,6 +57,8 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved; @@ -62,9 +70,17 @@ import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; +import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration; +import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.compat.java8.FutureConverters; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote @@ -72,9 +88,14 @@ import scala.compat.java8.FutureConverters; */ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { + private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class); + private static final String PERSISTENCE_ID = "sharding-service-actor"; private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); + static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS); + static final int LOOKUP_TASK_MAX_RETRIES = 100; + private final DistributedShardedDOMDataTree shardingService; private final ActorSystem actorSystem; private final ClusterWrapper clusterWrapper; @@ -356,6 +377,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { map -> map.put(cluster, configuration.toDataMapKey(), configuration)); replicator.tell(update, self()); + + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, + actorContext, shardingService, configuration.getPrefix()), + actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { @@ -391,6 +418,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), map -> map.remove(cluster, "prefix=" + message.getPrefix())); replicator.tell(removal, self()); + + final ShardRemovalLookupTask removalTask = + new ShardRemovalLookupTask(actorSystem, getSender(), + actorContext, message.getPrefix()); + + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } private void onPrefixShardRemoved(final PrefixShardRemoved message) { @@ -448,6 +481,277 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } + private abstract static class LookupTask implements Runnable { + + private final ActorRef replyTo; + private int retries = 0; + + private LookupTask(final ActorRef replyTo) { + this.replyTo = replyTo; + } + + abstract void reschedule(int retries); + + void tryReschedule(@Nullable final Throwable throwable) { + if (retries <= LOOKUP_TASK_MAX_RETRIES) { + retries++; + reschedule(retries); + } else { + fail(throwable); + } + } + + void fail(@Nullable final Throwable throwable) { + if (throwable == null) { + replyTo.tell(new Failure( + new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." + + "Failing..")), noSender()); + } else { + replyTo.tell(new Failure( + new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." + + "Failing..", throwable)), noSender()); + } + } + } + + /** + * Handles the lookup step of cds shard creation once the configuration is updated. + */ + private static class ShardCreationLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ClusterWrapper clusterWrapper; + private final ActorContext context; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + + ShardCreationLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ClusterWrapper clusterWrapper, + final ActorContext context, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup) { + super(replyTo); + this.system = system; + this.replyTo = replyTo; + this.clusterWrapper = clusterWrapper; + this.context = context; + this.shardingService = shardingService; + this.toLookup = toLookup; + } + + @Override + public void run() { + final Future localShardFuture = + context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); + + localShardFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup); + + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, + shardingService, toLookup), + system.dispatcher()); + } + } + }, system.dispatcher()); + } + + @Override + void reschedule(int retries) { + LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher()); + } + } + + /** + * Handles the readiness step by waiting for a leader of the created shard. + */ + private static class ShardLeaderLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + + ShardLeaderLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup) { + super(replyTo); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + this.shardingService = shardingService; + this.toLookup = toLookup; + } + + @Override + public void run() { + + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final java.util.Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..", + clusterWrapper.getCurrentMemberName(), toLookup); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new FrontendLookupTask(system, replyTo, shardingService, toLookup), + system.dispatcher()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + + } + + @Override + void reschedule(int retries) { + LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher()); + } + } + + /** + * After backend is ready this handles the last step - checking if we have a frontend shard for the backend, + * once this completes(which should be ready by the time the backend is created, this is just a sanity check in + * case they race), the future for the cds shard creation is completed and the shard is ready for use. + */ + private static final class FrontendLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + + FrontendLookupTask(final ActorSystem system, + final ActorRef replyTo, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup) { + super(replyTo); + this.system = system; + this.replyTo = replyTo; + this.shardingService = shardingService; + this.toLookup = toLookup; + } + + @Override + public void run() { + final DOMDataTreePrefixTableEntry> entry = + shardingService.lookupShardFrontend(toLookup); + + if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) { + replyTo.tell(new Success(null), noSender()); + } else { + tryReschedule(null); + } + } + + private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry entry, + final DOMDataTreeIdentifier prefix) { + if (entry == null) { + return false; + } + + if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) { + return true; + } + + if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) { + return true; + } + + return false; + } + + @Override + void reschedule(int retries) { + LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher()); + } + } + + /** + * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the + * configuration. + */ + private static class ShardRemovalLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final DOMDataTreeIdentifier toLookup; + + ShardRemovalLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final DOMDataTreeIdentifier toLookup) { + super(replyTo); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.toLookup = toLookup; + } + + @Override + public void run() { + final Future localShardFuture = + context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); + + localShardFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + if (throwable != null) { + //TODO Shouldn't we check why findLocalShard failed? + LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future", + toLookup); + replyTo.tell(new Success(null), noSender()); + } else { + tryReschedule(null); + } + } + }, system.dispatcher()); + } + + @Override + void reschedule(int retries) { + LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..", + toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher()); + } + } + public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; @@ -515,6 +819,5 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { verify(); return Props.create(ShardedDataTreeActor.class, this); } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java index 8256edd272..c0df6bd005 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; @@ -14,6 +15,9 @@ import org.opendaylight.controller.cluster.access.concepts.FrontendType; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import scala.compat.java8.FutureConverters; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; public abstract class AbstractTest { protected static final MemberName MEMBER_NAME = MemberName.forName("member-1"); @@ -40,4 +44,9 @@ public abstract class AbstractTest { protected static LocalHistoryIdentifier nextHistoryId() { return new LocalHistoryIdentifier(CLIENT_ID, HISTORY_COUNTER.incrementAndGet()); } + + protected static T waitOnAsyncTask(final CompletionStage completionStage, final Duration timeout) + throws Exception { + return Await.result(FutureConverters.toScala(completionStage), timeout); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java index f054100134..27c5c49d70 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java @@ -43,7 +43,6 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -75,11 +74,14 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { private final Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5) + .logicalStoreType( + org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); private final DatastoreContext.Builder followerDatastoreContextBuilder = DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5) - .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + .logicalStoreType( + org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); private DistributedDataStore followerDistributedDataStore; private DistributedDataStore leaderDistributedDataStore; @@ -155,8 +157,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { leaderTestKit.waitForMembersUp("member-2"); final DistributedShardRegistration shardRegistration = - leaderShardFactory.createDistributedShard( - TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + waitOnAsyncTask(leaderShardFactory.createDistributedShard( + TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); @@ -208,8 +211,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { LOG.warn("registering first shard"); final DistributedShardRegistration shardRegistration = - leaderShardFactory.createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + waitOnAsyncTask(leaderShardFactory.createDistributedShard( + TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); @@ -259,24 +263,24 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { public void testMultipleShardRegistrations() throws Exception { initEmptyDatastores("config"); - final DistributedShardRegistration reg1 = leaderShardFactory - .createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( + TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - final DistributedShardRegistration reg2 = leaderShardFactory - .createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - final DistributedShardRegistration reg3 = leaderShardFactory - .createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - final DistributedShardRegistration reg4 = leaderShardFactory - .createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); @@ -356,9 +360,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { for (int i = 0; i < 10; i++) { LOG.debug("Round {}", i); - final DistributedShardRegistration reg1 = leaderShardFactory - .createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( + TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); @@ -369,7 +373,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - reg1.close(); + waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java index cd21a4c19b..9841ca0125 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java @@ -51,10 +51,6 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; -import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; -import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; -import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; -import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -70,6 +66,7 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; @@ -156,49 +153,38 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { public void testWritesIntoDefaultShard() throws Exception { initEmptyDatastore("config"); - leaderShardFactory.createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), - ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); - final DOMDataTreeIdentifier configRoot = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot)); final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); - final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID); + final DOMDataTreeWriteCursor cursor = + tx.createCursor(new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)); Assert.assertNotNull(cursor); + + final ContainerNode test = + ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)).build(); + + cursor.write(test.getIdentifier(), test); + cursor.close(); + + tx.submit().checkedGet(); } @Test public void testSingleNodeWrites() throws Exception { initEmptyDatastore("config"); - leaderShardFactory.createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + final DistributedShardRegistration shardRegistration = waitOnAsyncTask( + leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - final DistributedShardRegistration shardRegistration = - leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - LOG.warn("Got after waiting for nonleader"); - final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); - - new JavaTestKit(leaderSystem) { - { - leaderShardManager.tell( - new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); - - leaderDistributedDataStore.getActorContext().getShardManager().tell( - new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); - } - }; - final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); @@ -212,7 +198,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { valueToCheck); cursor.close(); - LOG.warn("Got to pre submit"); + LOG.debug("Got to pre submit"); tx.submit().checkedGet(); @@ -241,8 +227,10 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { public void testMultipleWritesIntoSingleMapEntry() throws Exception { initEmptyDatastore("config"); - final DistributedShardRegistration shardRegistration = - leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)); + final DistributedShardRegistration shardRegistration = waitOnAsyncTask( + leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); @@ -255,8 +243,9 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { final YangInstanceIdentifier oid1 = getOuterListIdFor(0); final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1); - final DistributedShardRegistration outerListShardReg = leaderShardFactory.createDistributedShard(outerListPath, - Lists.newArrayList(AbstractTest.MEMBER_NAME)); + final DistributedShardRegistration outerListShardReg = waitOnAsyncTask( + leaderShardFactory.createDistributedShard(outerListPath, Lists.newArrayList(AbstractTest.MEMBER_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(outerListPath.getRootIdentifier())); @@ -336,16 +325,27 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { public void testDistributedData() throws Exception { initEmptyDatastore("config"); - leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)); - leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME)); - leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME)); - leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME)); + waitOnAsyncTask( + leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + + waitOnAsyncTask( + leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + + waitOnAsyncTask( + leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + + waitOnAsyncTask( + leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); @@ -364,9 +364,9 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { for (int i = 0; i < 10; i++) { LOG.debug("Round {}", i); - final DistributedShardRegistration reg1 = leaderShardFactory - .createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME)); + final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( + TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); @@ -374,11 +374,10 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - reg1.close(); + waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - } } -- 2.36.6