Implement scatter/gather on module shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index da9789693cdca41e1be07f428775a2c74403bb29..43bf09eb8aceee72089911717bd8d6b5e6c2c556 100644 (file)
@@ -7,27 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSelection;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -39,17 +31,13 @@ import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
+import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -95,7 +83,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
 
         final SettableFuture<T> proxyFuture = SettableFuture.create();
-        AbstractTransactionContextWrapper contextWrapper = getContextWrapper(shardName);
+        AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName);
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
@@ -121,27 +109,9 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     private FluentFuture<Optional<NormalizedNode>> readAllData() {
-        final Set<String> allShardNames = txContextFactory.getActorUtils().getConfiguration().getAllShardNames();
-        final Collection<FluentFuture<Optional<NormalizedNode>>> futures = new ArrayList<>(allShardNames.size());
-
-        for (String shardName : allShardNames) {
-            futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty()));
-        }
-
-        final ListenableFuture<List<Optional<NormalizedNode>>> listFuture = Futures.allAsList(futures);
-        final ListenableFuture<Optional<NormalizedNode>> aggregateFuture;
-
-        aggregateFuture = Futures.transform(listFuture, input -> {
-            try {
-                return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input,
-                        txContextFactory.getActorUtils().getSchemaContext(),
-                        txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType());
-            } catch (DataValidationFailedException e) {
-                throw new IllegalArgumentException("Failed to aggregate", e);
-            }
-        }, MoreExecutors.directExecutor());
-
-        return FluentFuture.from(aggregateFuture);
+        final var actorUtils = getActorUtils();
+        return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream()
+            .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.empty())));
     }
 
     @Override
@@ -157,7 +127,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private void deleteAllData() {
         for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
-            getContextWrapper(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
+            wrapperFromShardName(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
         }
     }
 
@@ -166,26 +136,17 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         checkModificationState("merge", path);
 
         if (path.isEmpty()) {
-            mergeAllData(checkRootData(data));
+            mergeAllData(RootScatterGather.castRootNode(data));
         } else {
             executeModification(new MergeOperation(path, data));
         }
     }
 
     private void mergeAllData(final ContainerNode rootData) {
-        // Populate requests for individual shards that are being touched
-        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
-        for (DataContainerChild child : rootData.body()) {
-            final String shardName = shardNameFromRootChild(child);
-            rootBuilders.computeIfAbsent(shardName,
-                unused -> Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()))
-                .addChild(child);
-        }
-
-        // Now dispatch all merges
-        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
-            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new MergeOperation(
-                YangInstanceIdentifier.empty(), entry.getValue().build()));
+        if (!rootData.isEmpty()) {
+            RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach(
+                scattered -> scattered.shard().maybeExecuteTransactionOperation(
+                    new MergeOperation(YangInstanceIdentifier.empty(), scattered.container())));
         }
     }
 
@@ -194,40 +155,21 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         checkModificationState("write", path);
 
         if (path.isEmpty()) {
-            writeAllData(checkRootData(data));
+            writeAllData(RootScatterGather.castRootNode(data));
         } else {
             executeModification(new WriteOperation(path, data));
         }
     }
 
     private void writeAllData(final ContainerNode rootData) {
-        // Open builders for all shards
-        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
-        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
-            rootBuilders.put(shardName, Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()));
-        }
-
-        // Now distribute children as needed
-        for (DataContainerChild child : rootData.body()) {
-            final String shardName = shardNameFromRootChild(child);
-            verifyNotNull(rootBuilders.get(shardName), "Failed to find builder for %s", shardName).addChild(child);
-        }
-
-        // Now dispatch all writes
-        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
-            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new WriteOperation(
-                YangInstanceIdentifier.empty(), entry.getValue().build()));
-        }
+        RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild,
+            getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach(
+                scattered -> scattered.shard().maybeExecuteTransactionOperation(
+                    new WriteOperation(YangInstanceIdentifier.empty(), scattered.container())));
     }
 
     private void executeModification(final TransactionModificationOperation operation) {
-        getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation);
-    }
-
-    private static ContainerNode checkRootData(final NormalizedNode data) {
-        // Root has to be a container
-        checkArgument(data instanceof ContainerNode, "Invalid root data %s", data);
-        return (ContainerNode) data;
+        wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation);
     }
 
     private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
@@ -353,19 +295,15 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
     }
 
-    private String shardNameFromRootChild(final DataContainerChild child) {
-        return shardNameFromIdentifier(YangInstanceIdentifier.create(child.getIdentifier()));
-    }
-
     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
         return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
     }
 
-    private AbstractTransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
-        return getContextWrapper(shardNameFromIdentifier(path));
+    private AbstractTransactionContextWrapper wrapperFromRootChild(final PathArgument childId) {
+        return wrapperFromShardName(shardNameFromIdentifier(YangInstanceIdentifier.create(childId)));
     }
 
-    private AbstractTransactionContextWrapper getContextWrapper(final String shardName) {
+    private AbstractTransactionContextWrapper wrapperFromShardName(final String shardName) {
         final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
         if (existing != null) {
             return existing;