From 438d5278050961f0773707e6508b1b736eac1c0f Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 7 Apr 2022 17:45:48 +0200 Subject: [PATCH] Implement scatter/gather on module shards ClientBackedDatastore does perform module shard root scatter/gather, hence reads of root end up talking to only the default shard. Refactor the scatter/gather logic from DistributedDatastore into a common component and use it for both modes of operation. JIRA: CONTROLLER-2038 Change-Id: Ib04efab46e36e512eb89606d026fd2082de1b693 Signed-off-by: Robert Varga --- .../actors/dds/AbstractClientHandle.java | 13 +- .../actors/dds/AbstractClientHistory.java | 10 ++ .../dds/AbstractDataStoreClientBehavior.java | 11 +- .../databroker/actors/dds/ClientSnapshot.java | 14 +- .../actors/dds/ClientTransaction.java | 51 ++++-- .../DistributedDataStoreClientBehavior.java | 13 +- .../dds/ModuleShardBackendResolver.java | 12 +- .../dds/SimpleDataStoreClientBehavior.java | 8 +- .../cluster/datastore/TransactionProxy.java | 104 +++--------- .../datastore/utils/RootScatterGather.java | 148 ++++++++++++++++++ .../actors/dds/AbstractClientHandleTest.java | 16 +- .../AbstractDataStoreClientBehaviorTest.java | 12 +- .../actors/dds/ClientSnapshotTest.java | 13 +- .../actors/dds/ClientTransactionTest.java | 3 +- ...istributedDataStoreClientBehaviorTest.java | 17 +- 15 files changed, 312 insertions(+), 133 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/RootScatterGather.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java index 003c073de8..d10627dcf9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java @@ -15,6 +15,7 @@ import com.google.common.base.MoreObjects; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.stream.Stream; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -107,10 +108,16 @@ public abstract class AbstractClientHandle e } final T ensureProxy(final YangInstanceIdentifier path) { - final State local = getState(); - final Long shard = parent.resolveShardForPath(path); + return ensureProxy(getState(), parent.resolveShardForPath(path)); + } + + private T ensureProxy(final State localState, final Long shard) { + return localState.computeIfAbsent(shard, this::createProxy); + } - return local.computeIfAbsent(shard, this::createProxy); + final Stream ensureAllProxies() { + final var local = getState(); + return parent.resolveAllShards().map(shard -> ensureProxy(local, shard)); } final AbstractClientHistory parent() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 95552b382e..796c23614e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.StampedLock; +import java.util.stream.Stream; import org.checkerframework.checker.lock.qual.GuardedBy; import org.checkerframework.checker.lock.qual.Holding; import org.eclipse.jdt.annotation.NonNull; @@ -31,6 +32,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -117,6 +119,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id return client.resolveShardForPath(path); } + final Stream resolveAllShards() { + return client.resolveAllShards(); + } + + final ActorUtils actorUtils() { + return client.actorUtils(); + } + @Override final void localAbort(final Throwable cause) { final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java index 4f91cb27fa..7187f83a1a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java @@ -17,13 +17,14 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.StampedLock; -import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; +import java.util.stream.Stream; import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.client.ReconnectForwarder; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +72,7 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior resolver) { + final AbstractShardBackendResolver resolver) { super(context, resolver); singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); } @@ -224,4 +225,10 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior resolveAllShards(); + + final ActorUtils actorUtils() { + return ((AbstractShardBackendResolver) resolver()).actorUtils(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java index d20a618c3d..5ba842aae3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java @@ -11,6 +11,7 @@ import com.google.common.annotations.Beta; import com.google.common.util.concurrent.FluentFuture; import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -28,19 +29,20 @@ public class ClientSnapshot extends AbstractClientHandle exists(final YangInstanceIdentifier path) { - return ensureSnapshotProxy(path).exists(path); + return ensureProxy(path).exists(path); } public FluentFuture> read(final YangInstanceIdentifier path) { - return ensureSnapshotProxy(path).read(path); + return path.isEmpty() ? readRoot() : ensureProxy(path).read(path); + } + + private FluentFuture> readRoot() { + return RootScatterGather.gather(parent().actorUtils(), ensureAllProxies() + .map(proxy -> proxy.read(YangInstanceIdentifier.empty()))); } @Override final AbstractProxyTransaction createProxy(final Long shard) { return parent().createSnapshotProxy(getIdentifier(), shard); } - - private AbstractProxyTransaction ensureSnapshotProxy(final YangInstanceIdentifier path) { - return ensureProxy(path); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index 7cdc04aba1..4fbb49a61d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -14,9 +14,13 @@ import com.google.common.util.concurrent.FluentFuture; import java.util.Collection; import java.util.Map; import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; /** @@ -55,28 +59,57 @@ public class ClientTransaction extends AbstractClientHandle exists(final YangInstanceIdentifier path) { - return ensureTransactionProxy(path).exists(path); + return ensureProxy(path).exists(path); } public FluentFuture> read(final YangInstanceIdentifier path) { - return ensureTransactionProxy(path).read(path); + return path.isEmpty() ? readRoot() : ensureProxy(path).read(path); + } + + private FluentFuture> readRoot() { + return RootScatterGather.gather(parent().actorUtils(), ensureAllProxies() + .map(proxy -> proxy.read(YangInstanceIdentifier.empty()))); } public void delete(final YangInstanceIdentifier path) { - ensureTransactionProxy(path).delete(path); + if (path.isEmpty()) { + ensureAllProxies().forEach(proxy -> proxy.delete(YangInstanceIdentifier.empty())); + } else { + ensureProxy(path).delete(path); + } } public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureTransactionProxy(path).merge(path, data); + if (path.isEmpty()) { + mergeRoot(RootScatterGather.castRootNode(data)); + } else { + ensureProxy(path).merge(path, data); + } + } + + private void mergeRoot(final @NonNull ContainerNode rootData) { + if (!rootData.isEmpty()) { + RootScatterGather.scatterTouched(rootData, this::ensureProxy).forEach( + scattered -> scattered.shard().merge(YangInstanceIdentifier.empty(), scattered.container())); + } } public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureTransactionProxy(path).write(path, data); + if (path.isEmpty()) { + writeRoot(RootScatterGather.castRootNode(data)); + } else { + ensureProxy(path).write(path, data); + } + } + + private void writeRoot(final @NonNull ContainerNode rootData) { + RootScatterGather.scatterAll(rootData, this::ensureProxy, ensureAllProxies()).forEach( + scattered -> scattered.shard().write(YangInstanceIdentifier.empty(), scattered.container())); + } + + private AbstractProxyTransaction ensureProxy(final PathArgument childId) { + return ensureProxy(YangInstanceIdentifier.create(childId)); } public DOMStoreThreePhaseCommitCohort ready() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index e40da21d13..f8927c28c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -7,7 +7,7 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import java.util.function.Function; +import java.util.stream.Stream; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -18,12 +18,12 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; * @author Robert Varga */ final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBehavior { - private final Function pathToShard; + private final ModuleShardBackendResolver resolver; private DistributedDataStoreClientBehavior(final ClientActorContext context, final ModuleShardBackendResolver resolver) { super(context, resolver); - pathToShard = resolver::resolveShardForPath; + this.resolver = resolver; } DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorUtils actorUtils) { @@ -32,7 +32,12 @@ final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBe @Override Long resolveShardForPath(final YangInstanceIdentifier path) { - return pathToShard.apply(path); + return resolver.resolveShardForPath(path); + } + + @Override + Stream resolveAllShards() { + return resolver.resolveAllShards(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 61bb78ed3f..ee887b00fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -18,6 +18,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.checkerframework.checker.lock.qual.GuardedBy; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; @@ -84,7 +85,16 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { } Long resolveShardForPath(final YangInstanceIdentifier path) { - final String shardName = actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path); + return resolveCookie(actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path)); + } + + Stream resolveAllShards() { + return actorUtils().getConfiguration().getAllShardNames().stream() + .sorted() + .map(this::resolveCookie); + } + + private @NonNull Long resolveCookie(final String shardName) { final Long cookie = shards.get(shardName); return cookie != null ? cookie : populateShard(shardName); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java index aaaa88e8b6..984a4e4f0c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import java.util.stream.Stream; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -18,7 +19,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; */ final class SimpleDataStoreClientBehavior extends AbstractDataStoreClientBehavior { // Pre-boxed instance - private static final Long ZERO = Long.valueOf(0); + private static final Long ZERO = 0L; private SimpleDataStoreClientBehavior(final ClientActorContext context, final SimpleShardBackendResolver resolver) { @@ -34,4 +35,9 @@ final class SimpleDataStoreClientBehavior extends AbstractDataStoreClientBehavio Long resolveShardForPath(final YangInstanceIdentifier path) { return ZERO; } + + @Override + Stream resolveAllShards() { + return Stream.of(ZERO); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index da9789693c..43bf09eb8a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -7,27 +7,19 @@ */ package org.opendaylight.controller.cluster.datastore; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; import akka.actor.ActorSelection; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FluentFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -39,17 +31,13 @@ import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; +import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather; import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -95,7 +83,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction proxyFuture = SettableFuture.create(); - AbstractTransactionContextWrapper contextWrapper = getContextWrapper(shardName); + AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName); contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(final TransactionContext transactionContext, final Boolean havePermit) { @@ -121,27 +109,9 @@ public class TransactionProxy extends AbstractDOMStoreTransaction> readAllData() { - final Set allShardNames = txContextFactory.getActorUtils().getConfiguration().getAllShardNames(); - final Collection>> futures = new ArrayList<>(allShardNames.size()); - - for (String shardName : allShardNames) { - futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty())); - } - - final ListenableFuture>> listFuture = Futures.allAsList(futures); - final ListenableFuture> aggregateFuture; - - aggregateFuture = Futures.transform(listFuture, input -> { - try { - return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input, - txContextFactory.getActorUtils().getSchemaContext(), - txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType()); - } catch (DataValidationFailedException e) { - throw new IllegalArgumentException("Failed to aggregate", e); - } - }, MoreExecutors.directExecutor()); - - return FluentFuture.from(aggregateFuture); + final var actorUtils = getActorUtils(); + return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream() + .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.empty()))); } @Override @@ -157,7 +127,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction> rootBuilders = new HashMap<>(); - for (DataContainerChild child : rootData.body()) { - final String shardName = shardNameFromRootChild(child); - rootBuilders.computeIfAbsent(shardName, - unused -> Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier())) - .addChild(child); - } - - // Now dispatch all merges - for (Entry> entry : rootBuilders.entrySet()) { - getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new MergeOperation( - YangInstanceIdentifier.empty(), entry.getValue().build())); + if (!rootData.isEmpty()) { + RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach( + scattered -> scattered.shard().maybeExecuteTransactionOperation( + new MergeOperation(YangInstanceIdentifier.empty(), scattered.container()))); } } @@ -194,40 +155,21 @@ public class TransactionProxy extends AbstractDOMStoreTransaction> rootBuilders = new HashMap<>(); - for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) { - rootBuilders.put(shardName, Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier())); - } - - // Now distribute children as needed - for (DataContainerChild child : rootData.body()) { - final String shardName = shardNameFromRootChild(child); - verifyNotNull(rootBuilders.get(shardName), "Failed to find builder for %s", shardName).addChild(child); - } - - // Now dispatch all writes - for (Entry> entry : rootBuilders.entrySet()) { - getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new WriteOperation( - YangInstanceIdentifier.empty(), entry.getValue().build())); - } + RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild, + getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach( + scattered -> scattered.shard().maybeExecuteTransactionOperation( + new WriteOperation(YangInstanceIdentifier.empty(), scattered.container()))); } private void executeModification(final TransactionModificationOperation operation) { - getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation); - } - - private static ContainerNode checkRootData(final NormalizedNode data) { - // Root has to be a container - checkArgument(data instanceof ContainerNode, "Invalid root data %s", data); - return (ContainerNode) data; + wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation); } private void checkModificationState(final String opName, final YangInstanceIdentifier path) { @@ -353,19 +295,15 @@ public class TransactionProxy extends AbstractDOMStoreTransaction { + private final ContainerNode container; + private final T shard; + + ShardContainer(final T shard, final ContainerNode container) { + this.shard = requireNonNull(shard); + this.container = requireNonNull(container); + } + + public T shard() { + return shard; + } + + public ContainerNode container() { + return container; + } + + @Override + public int hashCode() { + return shard.hashCode(); + } + + @Override + public boolean equals(final @Nullable Object obj) { + return obj == this || obj instanceof ShardContainer && shard.equals(((ShardContainer) obj).shard); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("shard", shard).toString(); + } + } + + private RootScatterGather() { + // Hidden on purpose + } + + /** + * Check whether a {@link NormalizedNode} represents a root container and return it cast to {@link ContainerNode}. + * + * @param node a normalized node + * @return {@code node} cast to ContainerNode + * @throws NullPointerException if {@code node} is null + * @throws IllegalArgumentException if {@code node} is not a {@link ContainerNode} + */ + public static @NonNull ContainerNode castRootNode(final NormalizedNode node) { + final var nonnull = requireNonNull(node); + checkArgument(nonnull instanceof ContainerNode, "Invalid root data %s", nonnull); + return (ContainerNode) nonnull; + } + + /** + * Reconstruct root container from a set of constituents. + * + * @param actorUtils {@link ActorUtils} reference + * @param readFutures Consitutent read futures + * @return A composite future + */ + public static @NonNull FluentFuture> gather(final ActorUtils actorUtils, + final Stream>> readFutures) { + return FluentFuture.from(Futures.transform( + Futures.allAsList(readFutures.collect(ImmutableList.toImmutableList())), input -> { + try { + return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input, + actorUtils.getSchemaContext(), actorUtils.getDatastoreContext().getLogicalStoreType()); + } catch (DataValidationFailedException e) { + throw new IllegalArgumentException("Failed to aggregate", e); + } + }, MoreExecutors.directExecutor())); + } + + public static @NonNull Stream> scatterAll(final ContainerNode rootNode, + final Function childToShard, final Stream allShards) { + final var builders = allShards + .collect(Collectors.toUnmodifiableMap(Function.identity(), unused -> Builders.containerBuilder())); + for (var child : rootNode.body()) { + final var shard = childToShard.apply(child.getIdentifier()); + verifyNotNull(builders.get(shard), "Failed to find builder for %s", shard).addChild(child); + } + return streamContainers(rootNode.getIdentifier(), builders); + } + + /** + * Split root container into per-shard root containers. + * + * @param Shard reference type + * @param rootNode Root container to be split up + * @param childToShard Mapping function from child {@link PathArgument} to shard reference + * @return Stream of {@link ShardContainer}s, one for each touched shard + */ + public static @NonNull Stream> scatterTouched(final ContainerNode rootNode, + final Function childToShard) { + final var builders = new HashMap>(); + for (var child : rootNode.body()) { + builders.computeIfAbsent(childToShard.apply(child.getIdentifier()), unused -> Builders.containerBuilder()) + .addChild(child); + } + return streamContainers(rootNode.getIdentifier(), builders); + } + + private static @NonNull Stream> streamContainers(final NodeIdentifier rootId, + final Map> builders) { + return builders.entrySet().stream() + .map(entry -> new ShardContainer<>(entry.getKey(), entry.getValue().withNodeIdentifier(rootId).build())); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java index d8ff9f8b5d..83a00ff19f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java @@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID; @@ -44,11 +45,13 @@ import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.tree.api.DataTree; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import scala.concurrent.Promise; @RunWith(MockitoJUnitRunner.StrictStubs.class) @@ -87,7 +90,7 @@ public abstract class AbstractClientHandleTest command = clientContextProbe.expectMsgClass(InternalCommand.class); command.execute(client); //data tree mock - when(dataTree.takeSnapshot()).thenReturn(dataTreeSnapshot); + doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot(); handle = createHandle(parent); } @@ -201,8 +204,13 @@ public abstract class AbstractClientHandleTest connection = behavior.getConnection(shard); //check cached connection for same shard @@ -167,8 +168,7 @@ public abstract class AbstractDataStoreClientBehaviorTest { final ActorSelection selection = system.actorSelection(actor.path()); final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0); promise.success(shardInfo); - when(mock.findPrimaryShardAsync(SHARD)).thenReturn(promise.future()); + doReturn(promise.future()).when(mock).findPrimaryShardAsync(SHARD); return mock; } - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshotTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshotTest.java index 9e8c332544..c5cdeb7df8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshotTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshotTest.java @@ -9,26 +9,23 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout; -import com.google.common.util.concurrent.ListenableFuture; import java.util.Optional; import org.junit.Before; import org.junit.Test; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class ClientSnapshotTest extends AbstractClientHandleTest { - private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty(); @Before @Override public void setUp() throws Exception { super.setUp(); - when(getDataTreeSnapshot().readNode(PATH)).thenReturn(Optional.empty()); + doReturn(Optional.empty()).when(getDataTreeSnapshot()).readNode(PATH); } @Override @@ -43,15 +40,15 @@ public class ClientSnapshotTest extends AbstractClientHandleTest @Test public void testExists() throws Exception { - final ListenableFuture exists = getHandle().exists(PATH); + final var exists = getHandle().exists(PATH); verify(getDataTreeSnapshot()).readNode(PATH); assertEquals(Boolean.FALSE, getWithTimeout(exists)); } @Test public void testRead() throws Exception { - final ListenableFuture> exists = getHandle().read(PATH); + final var read = getHandle().read(PATH); verify(getDataTreeSnapshot()).readNode(PATH); - assertFalse(getWithTimeout(exists).isPresent()); + assertFalse(getWithTimeout(read).isPresent()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java index f8c469e3d2..4cd21b160d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java @@ -28,6 +28,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; @@ -39,7 +40,7 @@ public class ClientTransactionTest extends AbstractClientHandleTest