Split modifications on datastore root
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 98e115efc6270342e4f54ce99f5dcf0e95d99bc8..9e5985a741738d9010ba5a53567d462c41911f7c 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSelection;
@@ -20,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -40,8 +43,13 @@ import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggrega
 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -59,6 +67,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
+    private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.empty());
 
     private final Map<String, TransactionContextWrapper> txContextWrappers = new TreeMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
@@ -139,27 +148,88 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     public void delete(final YangInstanceIdentifier path) {
         checkModificationState("delete", path);
 
-        executeModification(new DeleteOperation(path));
+        if (path.isEmpty()) {
+            deleteAllData();
+        } else {
+            executeModification(new DeleteOperation(path));
+        }
+    }
+
+    private void deleteAllData() {
+        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
+            getContextWrapper(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
+        }
     }
 
     @Override
     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         checkModificationState("merge", path);
 
-        executeModification(new MergeOperation(path, data));
+        if (path.isEmpty()) {
+            mergeAllData(checkRootData(data));
+        } else {
+            executeModification(new MergeOperation(path, data));
+        }
+    }
+
+    private void mergeAllData(final ContainerNode rootData) {
+        // Populate requests for individual shards that are being touched
+        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
+        for (DataContainerChild<?, ?> child : rootData.getValue()) {
+            final String shardName = shardNameFromRootChild(child);
+            rootBuilders.computeIfAbsent(shardName,
+                unused -> Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()))
+                .addChild(child);
+        }
+
+        // Now dispatch all merges
+        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
+            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new MergeOperation(
+                YangInstanceIdentifier.empty(), entry.getValue().build()));
+        }
     }
 
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         checkModificationState("write", path);
 
-        executeModification(new WriteOperation(path, data));
+        if (path.isEmpty()) {
+            writeAllData(checkRootData(data));
+        } else {
+            executeModification(new WriteOperation(path, data));
+        }
+    }
+
+    private void writeAllData(final ContainerNode rootData) {
+        // Open builders for all shards
+        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
+        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
+            rootBuilders.put(shardName, Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()));
+        }
+
+        // Now distribute children as needed
+        for (DataContainerChild<?, ?> child : rootData.getValue()) {
+            final String shardName = shardNameFromRootChild(child);
+            verifyNotNull(rootBuilders.get(shardName), "Failed to find builder for %s", shardName).addChild(child);
+        }
+
+        // Now dispatch all writes
+        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
+            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new WriteOperation(
+                YangInstanceIdentifier.empty(), entry.getValue().build()));
+        }
     }
 
     private void executeModification(final TransactionModificationOperation operation) {
         getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation);
     }
 
+    private static ContainerNode checkRootData(final NormalizedNode<?, ?> data) {
+        // Root has to be a container
+        checkArgument(data instanceof ContainerNode, "Invalid root data %s", data);
+        return (ContainerNode) data;
+    }
+
     private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
         checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed");
         checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed");
@@ -283,8 +353,12 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
     }
 
+    private String shardNameFromRootChild(final DataContainerChild<?, ?> child) {
+        return shardNameFromIdentifier(YangInstanceIdentifier.create(child.getIdentifier()));
+    }
+
     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
-        return txContextFactory.getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
+        return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
     }
 
     private TransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
@@ -310,7 +384,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return state != TransactionState.OPEN;
     }
 
-    ActorUtils getActorUtils() {
+    final ActorUtils getActorUtils() {
         return txContextFactory.getActorUtils();
     }
 }