Implement scatter/gather on module shards 78/100478/3
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 7 Apr 2022 15:45:48 +0000 (17:45 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 8 Apr 2022 02:22:44 +0000 (04:22 +0200)
ClientBackedDatastore does perform module shard root scatter/gather,
hence reads of root end up talking to only the default shard.

Refactor the scatter/gather logic from DistributedDatastore into a
common component and use it for both modes of operation.

JIRA: CONTROLLER-2038
Change-Id: Ib04efab46e36e512eb89606d026fd2082de1b693
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
15 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/RootScatterGather.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehaviorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshotTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehaviorTest.java

index 003c073de8ea12a101e9542cd4dab9259a8ae946..d10627dcf93e065df03c5f5130da50303d9ecc79 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.base.MoreObjects;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Stream;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -107,10 +108,16 @@ public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> e
     }
 
     final T ensureProxy(final YangInstanceIdentifier path) {
-        final State<T> local = getState();
-        final Long shard = parent.resolveShardForPath(path);
+        return ensureProxy(getState(), parent.resolveShardForPath(path));
+    }
+
+    private T ensureProxy(final State<T> localState, final Long shard) {
+        return localState.computeIfAbsent(shard, this::createProxy);
+    }
 
-        return local.computeIfAbsent(shard, this::createProxy);
+    final Stream<T> ensureAllProxies() {
+        final var local = getState();
+        return parent.resolveAllShards().map(shard -> ensureProxy(local, shard));
     }
 
     final AbstractClientHistory parent() {
index 95552b382e21e4c25cf63d8db25ab3e9caec99d7..796c23614e2fa220660bb09c6e7db0f84162b8df 100644 (file)
@@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.StampedLock;
+import java.util.stream.Stream;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
@@ -31,6 +32,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -117,6 +119,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id
         return client.resolveShardForPath(path);
     }
 
+    final Stream<Long> resolveAllShards() {
+        return client.resolveAllShards();
+    }
+
+    final ActorUtils actorUtils() {
+        return client.actorUtils();
+    }
+
     @Override
     final void localAbort(final Throwable cause) {
         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
index 4f91cb27fae151a26ba5c49dbd8ba12498dc8238..7187f83a1ac060d41341c110181877b9535b2985 100644 (file)
@@ -17,13 +17,14 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.StampedLock;
-import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import java.util.stream.Stream;
 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,7 +72,7 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
     private volatile Throwable aborted;
 
     AbstractDataStoreClientBehavior(final ClientActorContext context,
-            final BackendInfoResolver<ShardBackendInfo> resolver) {
+            final AbstractShardBackendResolver resolver) {
         super(context, resolver);
         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
     }
@@ -224,4 +225,10 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
     }
 
     abstract Long resolveShardForPath(YangInstanceIdentifier path);
+
+    abstract Stream<Long> resolveAllShards();
+
+    final ActorUtils actorUtils() {
+        return ((AbstractShardBackendResolver) resolver()).actorUtils();
+    }
 }
index d20a618c3df0c2d97feb8fbf951d6168a00cd5cb..5ba842aae3635fc6514c0a12b5dd59efd61fcbb3 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.annotations.Beta;
 import com.google.common.util.concurrent.FluentFuture;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -28,19 +29,20 @@ public class ClientSnapshot extends AbstractClientHandle<AbstractProxyTransactio
     }
 
     public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
-        return ensureSnapshotProxy(path).exists(path);
+        return ensureProxy(path).exists(path);
     }
 
     public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
-        return ensureSnapshotProxy(path).read(path);
+        return path.isEmpty() ? readRoot() : ensureProxy(path).read(path);
+    }
+
+    private FluentFuture<Optional<NormalizedNode>> readRoot() {
+        return RootScatterGather.gather(parent().actorUtils(), ensureAllProxies()
+            .map(proxy -> proxy.read(YangInstanceIdentifier.empty())));
     }
 
     @Override
     final AbstractProxyTransaction createProxy(final Long shard) {
         return parent().createSnapshotProxy(getIdentifier(), shard);
     }
-
-    private AbstractProxyTransaction ensureSnapshotProxy(final YangInstanceIdentifier path) {
-        return ensureProxy(path);
-    }
 }
index 7cdc04aba17b471ea218e3e8060141b9a8a2451e..4fbb49a61d3f23888c99e2d4f258a8601c8dc091 100644 (file)
@@ -14,9 +14,13 @@ import com.google.common.util.concurrent.FluentFuture;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 /**
@@ -55,28 +59,57 @@ public class ClientTransaction extends AbstractClientHandle<AbstractProxyTransac
         super(parent, transactionId);
     }
 
-    private AbstractProxyTransaction ensureTransactionProxy(final YangInstanceIdentifier path) {
-        return ensureProxy(path);
-    }
-
     public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
-        return ensureTransactionProxy(path).exists(path);
+        return ensureProxy(path).exists(path);
     }
 
     public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
-        return ensureTransactionProxy(path).read(path);
+        return path.isEmpty() ? readRoot() : ensureProxy(path).read(path);
+    }
+
+    private FluentFuture<Optional<NormalizedNode>> readRoot() {
+        return RootScatterGather.gather(parent().actorUtils(), ensureAllProxies()
+            .map(proxy -> proxy.read(YangInstanceIdentifier.empty())));
     }
 
     public void delete(final YangInstanceIdentifier path) {
-        ensureTransactionProxy(path).delete(path);
+        if (path.isEmpty()) {
+            ensureAllProxies().forEach(proxy -> proxy.delete(YangInstanceIdentifier.empty()));
+        } else {
+            ensureProxy(path).delete(path);
+        }
     }
 
     public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
-        ensureTransactionProxy(path).merge(path, data);
+        if (path.isEmpty()) {
+            mergeRoot(RootScatterGather.castRootNode(data));
+        } else {
+            ensureProxy(path).merge(path, data);
+        }
+    }
+
+    private void mergeRoot(final @NonNull ContainerNode rootData) {
+        if (!rootData.isEmpty()) {
+            RootScatterGather.scatterTouched(rootData, this::ensureProxy).forEach(
+                scattered -> scattered.shard().merge(YangInstanceIdentifier.empty(), scattered.container()));
+        }
     }
 
     public void write(final YangInstanceIdentifier path, final NormalizedNode data) {
-        ensureTransactionProxy(path).write(path, data);
+        if (path.isEmpty()) {
+            writeRoot(RootScatterGather.castRootNode(data));
+        } else {
+            ensureProxy(path).write(path, data);
+        }
+    }
+
+    private void writeRoot(final @NonNull ContainerNode rootData) {
+        RootScatterGather.scatterAll(rootData, this::ensureProxy, ensureAllProxies()).forEach(
+            scattered -> scattered.shard().write(YangInstanceIdentifier.empty(), scattered.container()));
+    }
+
+    private AbstractProxyTransaction ensureProxy(final PathArgument childId) {
+        return ensureProxy(YangInstanceIdentifier.create(childId));
     }
 
     public DOMStoreThreePhaseCommitCohort ready() {
index e40da21d138c2c5bde8876bae053bdf3f96db03b..f8927c28c859f0eab65c00efccd7a8a304baf109 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import java.util.function.Function;
+import java.util.stream.Stream;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -18,12 +18,12 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  * @author Robert Varga
  */
 final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
-    private final Function<YangInstanceIdentifier, Long> pathToShard;
+    private final ModuleShardBackendResolver resolver;
 
     private DistributedDataStoreClientBehavior(final ClientActorContext context,
             final ModuleShardBackendResolver resolver) {
         super(context, resolver);
-        pathToShard = resolver::resolveShardForPath;
+        this.resolver = resolver;
     }
 
     DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorUtils actorUtils) {
@@ -32,7 +32,12 @@ final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBe
 
     @Override
     Long resolveShardForPath(final YangInstanceIdentifier path) {
-        return pathToShard.apply(path);
+        return resolver.resolveShardForPath(path);
+    }
+
+    @Override
+    Stream<Long> resolveAllShards() {
+        return resolver.resolveAllShards();
     }
 
     @Override
index 61bb78ed3fc464d7abe502be7c8f8b833df0d773..ee887b00faca112951952d18715259871904cb84 100644 (file)
@@ -18,6 +18,7 @@ import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
@@ -84,7 +85,16 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
     }
 
     Long resolveShardForPath(final YangInstanceIdentifier path) {
-        final String shardName = actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
+        return resolveCookie(actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path));
+    }
+
+    Stream<Long> resolveAllShards() {
+        return actorUtils().getConfiguration().getAllShardNames().stream()
+            .sorted()
+            .map(this::resolveCookie);
+    }
+
+    private @NonNull Long resolveCookie(final String shardName) {
         final Long cookie = shards.get(shardName);
         return cookie != null ? cookie : populateShard(shardName);
     }
index aaaa88e8b6b2494e535ecab198f0f82c590f5ebd..984a4e4f0c81158fe90b404600e6ab8f9efec8ef 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import java.util.stream.Stream;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -18,7 +19,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  */
 final class SimpleDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
     // Pre-boxed instance
-    private static final Long ZERO = Long.valueOf(0);
+    private static final Long ZERO = 0L;
 
     private SimpleDataStoreClientBehavior(final ClientActorContext context,
             final SimpleShardBackendResolver resolver) {
@@ -34,4 +35,9 @@ final class SimpleDataStoreClientBehavior extends AbstractDataStoreClientBehavio
     Long resolveShardForPath(final YangInstanceIdentifier path) {
         return ZERO;
     }
+
+    @Override
+    Stream<Long> resolveAllShards() {
+        return Stream.of(ZERO);
+    }
 }
index da9789693cdca41e1be07f428775a2c74403bb29..43bf09eb8aceee72089911717bd8d6b5e6c2c556 100644 (file)
@@ -7,27 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSelection;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -39,17 +31,13 @@ import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
+import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -95,7 +83,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
 
         final SettableFuture<T> proxyFuture = SettableFuture.create();
-        AbstractTransactionContextWrapper contextWrapper = getContextWrapper(shardName);
+        AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName);
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
@@ -121,27 +109,9 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     private FluentFuture<Optional<NormalizedNode>> readAllData() {
-        final Set<String> allShardNames = txContextFactory.getActorUtils().getConfiguration().getAllShardNames();
-        final Collection<FluentFuture<Optional<NormalizedNode>>> futures = new ArrayList<>(allShardNames.size());
-
-        for (String shardName : allShardNames) {
-            futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty()));
-        }
-
-        final ListenableFuture<List<Optional<NormalizedNode>>> listFuture = Futures.allAsList(futures);
-        final ListenableFuture<Optional<NormalizedNode>> aggregateFuture;
-
-        aggregateFuture = Futures.transform(listFuture, input -> {
-            try {
-                return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input,
-                        txContextFactory.getActorUtils().getSchemaContext(),
-                        txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType());
-            } catch (DataValidationFailedException e) {
-                throw new IllegalArgumentException("Failed to aggregate", e);
-            }
-        }, MoreExecutors.directExecutor());
-
-        return FluentFuture.from(aggregateFuture);
+        final var actorUtils = getActorUtils();
+        return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream()
+            .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.empty())));
     }
 
     @Override
@@ -157,7 +127,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private void deleteAllData() {
         for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
-            getContextWrapper(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
+            wrapperFromShardName(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
         }
     }
 
@@ -166,26 +136,17 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         checkModificationState("merge", path);
 
         if (path.isEmpty()) {
-            mergeAllData(checkRootData(data));
+            mergeAllData(RootScatterGather.castRootNode(data));
         } else {
             executeModification(new MergeOperation(path, data));
         }
     }
 
     private void mergeAllData(final ContainerNode rootData) {
-        // Populate requests for individual shards that are being touched
-        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
-        for (DataContainerChild child : rootData.body()) {
-            final String shardName = shardNameFromRootChild(child);
-            rootBuilders.computeIfAbsent(shardName,
-                unused -> Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()))
-                .addChild(child);
-        }
-
-        // Now dispatch all merges
-        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
-            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new MergeOperation(
-                YangInstanceIdentifier.empty(), entry.getValue().build()));
+        if (!rootData.isEmpty()) {
+            RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach(
+                scattered -> scattered.shard().maybeExecuteTransactionOperation(
+                    new MergeOperation(YangInstanceIdentifier.empty(), scattered.container())));
         }
     }
 
@@ -194,40 +155,21 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         checkModificationState("write", path);
 
         if (path.isEmpty()) {
-            writeAllData(checkRootData(data));
+            writeAllData(RootScatterGather.castRootNode(data));
         } else {
             executeModification(new WriteOperation(path, data));
         }
     }
 
     private void writeAllData(final ContainerNode rootData) {
-        // Open builders for all shards
-        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
-        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
-            rootBuilders.put(shardName, Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()));
-        }
-
-        // Now distribute children as needed
-        for (DataContainerChild child : rootData.body()) {
-            final String shardName = shardNameFromRootChild(child);
-            verifyNotNull(rootBuilders.get(shardName), "Failed to find builder for %s", shardName).addChild(child);
-        }
-
-        // Now dispatch all writes
-        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
-            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new WriteOperation(
-                YangInstanceIdentifier.empty(), entry.getValue().build()));
-        }
+        RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild,
+            getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach(
+                scattered -> scattered.shard().maybeExecuteTransactionOperation(
+                    new WriteOperation(YangInstanceIdentifier.empty(), scattered.container())));
     }
 
     private void executeModification(final TransactionModificationOperation operation) {
-        getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation);
-    }
-
-    private static ContainerNode checkRootData(final NormalizedNode data) {
-        // Root has to be a container
-        checkArgument(data instanceof ContainerNode, "Invalid root data %s", data);
-        return (ContainerNode) data;
+        wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation);
     }
 
     private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
@@ -353,19 +295,15 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
     }
 
-    private String shardNameFromRootChild(final DataContainerChild child) {
-        return shardNameFromIdentifier(YangInstanceIdentifier.create(child.getIdentifier()));
-    }
-
     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
         return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
     }
 
-    private AbstractTransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
-        return getContextWrapper(shardNameFromIdentifier(path));
+    private AbstractTransactionContextWrapper wrapperFromRootChild(final PathArgument childId) {
+        return wrapperFromShardName(shardNameFromIdentifier(YangInstanceIdentifier.create(childId)));
     }
 
-    private AbstractTransactionContextWrapper getContextWrapper(final String shardName) {
+    private AbstractTransactionContextWrapper wrapperFromShardName(final String shardName) {
         final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
         if (existing != null) {
             return existing;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/RootScatterGather.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/RootScatterGather.java
new file mode 100644 (file)
index 0000000..4848a49
--- /dev/null
@@ -0,0 +1,148 @@
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
+
+/**
+ * Utility methods for dealing with datastore root {@link ContainerNode} with respect to module shards.
+ */
+public final class RootScatterGather {
+    // FIXME: Record when we have JDK17+
+    @NonNullByDefault
+    public static final class ShardContainer<T> {
+        private final ContainerNode container;
+        private final T shard;
+
+        ShardContainer(final T shard, final ContainerNode container) {
+            this.shard = requireNonNull(shard);
+            this.container = requireNonNull(container);
+        }
+
+        public T shard() {
+            return shard;
+        }
+
+        public ContainerNode container() {
+            return container;
+        }
+
+        @Override
+        public int hashCode() {
+            return shard.hashCode();
+        }
+
+        @Override
+        public boolean equals(final @Nullable Object obj) {
+            return obj == this || obj instanceof ShardContainer && shard.equals(((ShardContainer<?>) obj).shard);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("shard", shard).toString();
+        }
+    }
+
+    private RootScatterGather() {
+        // Hidden on purpose
+    }
+
+    /**
+     * Check whether a {@link NormalizedNode} represents a root container and return it cast to {@link ContainerNode}.
+     *
+     * @param node a normalized node
+     * @return {@code node} cast to ContainerNode
+     * @throws NullPointerException if {@code node} is null
+     * @throws IllegalArgumentException if {@code node} is not a {@link ContainerNode}
+     */
+    public static @NonNull ContainerNode castRootNode(final NormalizedNode node) {
+        final var nonnull = requireNonNull(node);
+        checkArgument(nonnull instanceof ContainerNode, "Invalid root data %s", nonnull);
+        return (ContainerNode) nonnull;
+    }
+
+    /**
+     * Reconstruct root container from a set of constituents.
+     *
+     * @param actorUtils {@link ActorUtils} reference
+     * @param readFutures Consitutent read futures
+     * @return A composite future
+     */
+    public static @NonNull FluentFuture<Optional<NormalizedNode>> gather(final ActorUtils actorUtils,
+            final Stream<FluentFuture<Optional<NormalizedNode>>> readFutures) {
+        return FluentFuture.from(Futures.transform(
+            Futures.allAsList(readFutures.collect(ImmutableList.toImmutableList())), input -> {
+                try {
+                    return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input,
+                        actorUtils.getSchemaContext(), actorUtils.getDatastoreContext().getLogicalStoreType());
+                } catch (DataValidationFailedException e) {
+                    throw new IllegalArgumentException("Failed to aggregate", e);
+                }
+            }, MoreExecutors.directExecutor()));
+    }
+
+    public static <T> @NonNull Stream<ShardContainer<T>> scatterAll(final ContainerNode rootNode,
+            final Function<PathArgument, T> childToShard, final Stream<T> allShards) {
+        final var builders = allShards
+            .collect(Collectors.toUnmodifiableMap(Function.identity(), unused -> Builders.containerBuilder()));
+        for (var child : rootNode.body()) {
+            final var shard = childToShard.apply(child.getIdentifier());
+            verifyNotNull(builders.get(shard), "Failed to find builder for %s", shard).addChild(child);
+        }
+        return streamContainers(rootNode.getIdentifier(), builders);
+    }
+
+    /**
+     * Split root container into per-shard root containers.
+     *
+     * @param <T> Shard reference type
+     * @param rootNode Root container to be split up
+     * @param childToShard Mapping function from child {@link PathArgument} to shard reference
+     * @return Stream of {@link ShardContainer}s, one for each touched shard
+     */
+    public static <T> @NonNull Stream<ShardContainer<T>> scatterTouched(final ContainerNode rootNode,
+            final Function<PathArgument, T> childToShard) {
+        final var builders = new HashMap<T, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>>();
+        for (var child : rootNode.body()) {
+            builders.computeIfAbsent(childToShard.apply(child.getIdentifier()), unused -> Builders.containerBuilder())
+                .addChild(child);
+        }
+        return streamContainers(rootNode.getIdentifier(), builders);
+    }
+
+    private static <T> @NonNull Stream<ShardContainer<T>> streamContainers(final NodeIdentifier rootId,
+            final Map<T, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> builders) {
+        return builders.entrySet().stream()
+            .map(entry -> new ShardContainer<>(entry.getKey(), entry.getValue().withNodeIdentifier(rootId).build()));
+    }
+}
index d8ff9f8b5dd2236496c3540889f1446cd61d75ad..83a00ff19fdccf6098ff5b39998880ebc29ec6bb 100644 (file)
@@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
@@ -44,11 +45,13 @@ import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import scala.concurrent.Promise;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
@@ -87,7 +90,7 @@ public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<Ab
         final InternalCommand<ShardBackendInfo> command = clientContextProbe.expectMsgClass(InternalCommand.class);
         command.execute(client);
         //data tree mock
-        when(dataTree.takeSnapshot()).thenReturn(dataTreeSnapshot);
+        doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
 
         handle = createHandle(parent);
     }
@@ -201,8 +204,13 @@ public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<Ab
         final ActorSelection selection = system.actorSelection(actor.path());
         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
         promise.success(shardInfo);
-        when(mock.findPrimaryShardAsync(any())).thenReturn(promise.future());
+        doReturn(promise.future()).when(mock).findPrimaryShardAsync(any());
+
+        final EffectiveModelContext context = mock(EffectiveModelContext.class);
+        lenient().doCallRealMethod().when(context).getQName();
+        lenient().doReturn(context).when(mock).getSchemaContext();
+        lenient().doReturn(DatastoreContext.newBuilder().build()).when(mock).getDatastoreContext();
+
         return mock;
     }
-
 }
index 0b379f9655454eceb73c926218b8f688c9bee0fc..14a29c5101394be7a971802769bfa7f1ecd40e25 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
 
 import akka.actor.ActorRef;
@@ -134,14 +134,15 @@ public abstract class AbstractDataStoreClientBehaviorTest {
     public void testGetConnection() {
         //set up data tree mock
         final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
-        when(modification.readNode(YangInstanceIdentifier.empty())).thenReturn(Optional.empty());
+        doReturn(Optional.empty()).when(modification).readNode(YangInstanceIdentifier.empty());
         final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
-        when(snapshot.newModification()).thenReturn(modification);
+        doReturn(modification).when(snapshot).newModification();
         final DataTree dataTree = mock(DataTree.class);
-        when(dataTree.takeSnapshot()).thenReturn(snapshot);
+        doReturn(snapshot).when(dataTree).takeSnapshot();
 
         final TestProbe backendProbe = new TestProbe(system, "backend");
         final long shard = 0L;
+
         behavior.createTransaction().read(YangInstanceIdentifier.empty());
         final AbstractClientConnection<ShardBackendInfo> connection = behavior.getConnection(shard);
         //check cached connection for same shard
@@ -167,8 +168,7 @@ public abstract class AbstractDataStoreClientBehaviorTest {
         final ActorSelection selection = system.actorSelection(actor.path());
         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
         promise.success(shardInfo);
-        when(mock.findPrimaryShardAsync(SHARD)).thenReturn(promise.future());
+        doReturn(promise.future()).when(mock).findPrimaryShardAsync(SHARD);
         return mock;
     }
-
 }
index 9e8c33254418ad04d9799980a833ad83ac1a4297..c5cdeb7df8f13c42e0ffe6c85c83eda16f7879fb 100644 (file)
@@ -9,26 +9,23 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Optional;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class ClientSnapshotTest extends AbstractClientHandleTest<ClientSnapshot> {
-
     private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
 
     @Before
     @Override
     public void setUp() throws Exception {
         super.setUp();
-        when(getDataTreeSnapshot().readNode(PATH)).thenReturn(Optional.empty());
+        doReturn(Optional.empty()).when(getDataTreeSnapshot()).readNode(PATH);
     }
 
     @Override
@@ -43,15 +40,15 @@ public class ClientSnapshotTest extends AbstractClientHandleTest<ClientSnapshot>
 
     @Test
     public void testExists() throws Exception {
-        final ListenableFuture<Boolean> exists = getHandle().exists(PATH);
+        final var exists = getHandle().exists(PATH);
         verify(getDataTreeSnapshot()).readNode(PATH);
         assertEquals(Boolean.FALSE, getWithTimeout(exists));
     }
 
     @Test
     public void testRead() throws Exception {
-        final ListenableFuture<Optional<NormalizedNode>> exists = getHandle().read(PATH);
+        final var read = getHandle().read(PATH);
         verify(getDataTreeSnapshot()).readNode(PATH);
-        assertFalse(getWithTimeout(exists).isPresent());
+        assertFalse(getWithTimeout(read).isPresent());
     }
 }
index f8c469e3d22a33c263105ea61b4738cce7cad5ca..4cd21b160d7ab4aa133ddbed048839807afdffa4 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
@@ -39,7 +40,7 @@ public class ClientTransactionTest extends AbstractClientHandleTest<ClientTransa
             .node(QName.create("ns-1", "node-1"))
             .build();
     private static final ContainerNode DATA = Builders.containerBuilder()
-            .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(PATH.getLastPathArgument().getNodeType()))
+            .withNodeIdentifier(NodeIdentifier.create(PATH.getLastPathArgument().getNodeType()))
             .build();
 
     @Mock
index a546955ab82ae306abfa9126e9a49edfa6ce2597..312edb335c9a01ad195af5fdb3ac984a6e252d50 100644 (file)
@@ -8,10 +8,12 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
+import java.util.Set;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
@@ -20,11 +22,16 @@ public class DistributedDataStoreClientBehaviorTest extends AbstractDataStoreCli
     @Override
     protected AbstractDataStoreClientBehavior createBehavior(final ClientActorContext clientContext,
                                                              final ActorUtils context) {
-        final ShardStrategyFactory factory = mock(ShardStrategyFactory.class);
         final ShardStrategy strategy = mock(ShardStrategy.class);
-        when(strategy.findShard(any())).thenReturn(SHARD);
-        when(factory.getStrategy(any())).thenReturn(strategy);
-        when(context.getShardStrategyFactory()).thenReturn(factory);
+        doReturn(SHARD).when(strategy).findShard(any());
+        final ShardStrategyFactory factory = mock(ShardStrategyFactory.class);
+        doReturn(strategy).when(factory).getStrategy(any());
+        doReturn(factory).when(context).getShardStrategyFactory();
+
+        final Configuration config = mock(Configuration.class);
+        doReturn(Set.of(SHARD)).when(config).getAllShardNames();
+        doReturn(config).when(context).getConfiguration();
+
         return new DistributedDataStoreClientBehavior(clientContext, context);
     }
 }