From: Robert Varga Date: Thu, 25 Jun 2020 14:31:24 +0000 (+0200) Subject: Split modifications on datastore root X-Git-Tag: v2.0.3~14 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f697416012940c229747715d10d632037e718611 Split modifications on datastore root When we are dealing with modifications, we must not assume they are not spanning shards. Specifically, we need to make sure we split data when the user is targeting datastore root, as for those cases we need to split top-level containers to their appropriate shards, not the default shard. JIRA: CONTROLLER-1950 Change-Id: Ie8ad233fed993fddd54a1e4865884fd4474fc81f Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 98e115efc6..9e5985a741 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -7,7 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; import akka.actor.ActorSelection; @@ -20,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -40,8 +43,13 @@ import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggrega import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -59,6 +67,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction txContextWrappers = new TreeMap<>(); private final AbstractTransactionContextFactory txContextFactory; @@ -139,27 +148,88 @@ public class TransactionProxy extends AbstractDOMStoreTransaction data) { checkModificationState("merge", path); - executeModification(new MergeOperation(path, data)); + if (path.isEmpty()) { + mergeAllData(checkRootData(data)); + } else { + executeModification(new MergeOperation(path, data)); + } + } + + private void mergeAllData(final ContainerNode rootData) { + // Populate requests for individual shards that are being touched + final Map> rootBuilders = new HashMap<>(); + for (DataContainerChild child : rootData.getValue()) { + final String shardName = shardNameFromRootChild(child); + rootBuilders.computeIfAbsent(shardName, + unused -> Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier())) + .addChild(child); + } + + // Now dispatch all merges + for (Entry> entry : rootBuilders.entrySet()) { + getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new MergeOperation( + YangInstanceIdentifier.empty(), entry.getValue().build())); + } } @Override public void write(final YangInstanceIdentifier path, final NormalizedNode data) { checkModificationState("write", path); - executeModification(new WriteOperation(path, data)); + if (path.isEmpty()) { + writeAllData(checkRootData(data)); + } else { + executeModification(new WriteOperation(path, data)); + } + } + + private void writeAllData(final ContainerNode rootData) { + // Open builders for all shards + final Map> rootBuilders = new HashMap<>(); + for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) { + rootBuilders.put(shardName, Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier())); + } + + // Now distribute children as needed + for (DataContainerChild child : rootData.getValue()) { + final String shardName = shardNameFromRootChild(child); + verifyNotNull(rootBuilders.get(shardName), "Failed to find builder for %s", shardName).addChild(child); + } + + // Now dispatch all writes + for (Entry> entry : rootBuilders.entrySet()) { + getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new WriteOperation( + YangInstanceIdentifier.empty(), entry.getValue().build())); + } } private void executeModification(final TransactionModificationOperation operation) { getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation); } + private static ContainerNode checkRootData(final NormalizedNode data) { + // Root has to be a container + checkArgument(data instanceof ContainerNode, "Invalid root data %s", data); + return (ContainerNode) data; + } + private void checkModificationState(final String opName, final YangInstanceIdentifier path) { checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed"); checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed"); @@ -283,8 +353,12 @@ public class TransactionProxy extends AbstractDOMStoreTransaction child) { + return shardNameFromIdentifier(YangInstanceIdentifier.create(child.getIdentifier())); + } + private String shardNameFromIdentifier(final YangInstanceIdentifier path) { - return txContextFactory.getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path); + return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path); } private TransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) { @@ -310,7 +384,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction