Split modifications on datastore root 99/90699/10
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 25 Jun 2020 14:31:24 +0000 (16:31 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 26 Jun 2020 12:32:38 +0000 (14:32 +0200)
When we are dealing with modifications, we must not assume they are
not spanning shards. Specifically, we need to make sure we split
data when the user is targeting datastore root, as for those cases
we need to split top-level containers to their appropriate shards,
not the default shard.

JIRA: CONTROLLER-1950
Change-Id: Ie8ad233fed993fddd54a1e4865884fd4474fc81f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java

index 98e115efc6270342e4f54ce99f5dcf0e95d99bc8..9e5985a741738d9010ba5a53567d462c41911f7c 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSelection;
@@ -20,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -40,8 +43,13 @@ import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggrega
 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -59,6 +67,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
+    private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.empty());
 
     private final Map<String, TransactionContextWrapper> txContextWrappers = new TreeMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
@@ -139,27 +148,88 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     public void delete(final YangInstanceIdentifier path) {
         checkModificationState("delete", path);
 
-        executeModification(new DeleteOperation(path));
+        if (path.isEmpty()) {
+            deleteAllData();
+        } else {
+            executeModification(new DeleteOperation(path));
+        }
+    }
+
+    private void deleteAllData() {
+        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
+            getContextWrapper(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
+        }
     }
 
     @Override
     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         checkModificationState("merge", path);
 
-        executeModification(new MergeOperation(path, data));
+        if (path.isEmpty()) {
+            mergeAllData(checkRootData(data));
+        } else {
+            executeModification(new MergeOperation(path, data));
+        }
+    }
+
+    private void mergeAllData(final ContainerNode rootData) {
+        // Populate requests for individual shards that are being touched
+        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
+        for (DataContainerChild<?, ?> child : rootData.getValue()) {
+            final String shardName = shardNameFromRootChild(child);
+            rootBuilders.computeIfAbsent(shardName,
+                unused -> Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()))
+                .addChild(child);
+        }
+
+        // Now dispatch all merges
+        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
+            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new MergeOperation(
+                YangInstanceIdentifier.empty(), entry.getValue().build()));
+        }
     }
 
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         checkModificationState("write", path);
 
-        executeModification(new WriteOperation(path, data));
+        if (path.isEmpty()) {
+            writeAllData(checkRootData(data));
+        } else {
+            executeModification(new WriteOperation(path, data));
+        }
+    }
+
+    private void writeAllData(final ContainerNode rootData) {
+        // Open builders for all shards
+        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
+        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
+            rootBuilders.put(shardName, Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()));
+        }
+
+        // Now distribute children as needed
+        for (DataContainerChild<?, ?> child : rootData.getValue()) {
+            final String shardName = shardNameFromRootChild(child);
+            verifyNotNull(rootBuilders.get(shardName), "Failed to find builder for %s", shardName).addChild(child);
+        }
+
+        // Now dispatch all writes
+        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
+            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new WriteOperation(
+                YangInstanceIdentifier.empty(), entry.getValue().build()));
+        }
     }
 
     private void executeModification(final TransactionModificationOperation operation) {
         getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation);
     }
 
+    private static ContainerNode checkRootData(final NormalizedNode<?, ?> data) {
+        // Root has to be a container
+        checkArgument(data instanceof ContainerNode, "Invalid root data %s", data);
+        return (ContainerNode) data;
+    }
+
     private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
         checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed");
         checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed");
@@ -283,8 +353,12 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
     }
 
+    private String shardNameFromRootChild(final DataContainerChild<?, ?> child) {
+        return shardNameFromIdentifier(YangInstanceIdentifier.create(child.getIdentifier()));
+    }
+
     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
-        return txContextFactory.getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
+        return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
     }
 
     private TransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
@@ -310,7 +384,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return state != TransactionState.OPEN;
     }
 
-    ActorUtils getActorUtils() {
+    final ActorUtils getActorUtils() {
         return txContextFactory.getActorUtils();
     }
 }