import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Stream;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
}
final T ensureProxy(final YangInstanceIdentifier path) {
- final State<T> local = getState();
- final Long shard = parent.resolveShardForPath(path);
+ return ensureProxy(getState(), parent.resolveShardForPath(path));
+ }
+
+ private T ensureProxy(final State<T> localState, final Long shard) {
+ return localState.computeIfAbsent(shard, this::createProxy);
+ }
- return local.computeIfAbsent(shard, this::createProxy);
+ final Stream<T> ensureAllProxies() {
+ final var local = getState();
+ return parent.resolveAllShards().map(shard -> ensureProxy(local, shard));
}
final AbstractClientHistory parent() {
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.StampedLock;
+import java.util.stream.Stream;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
return client.resolveShardForPath(path);
}
+ final Stream<Long> resolveAllShards() {
+ return client.resolveAllShards();
+ }
+
+ final ActorUtils actorUtils() {
+ return client.actorUtils();
+ }
+
@Override
final void localAbort(final Throwable cause) {
final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
-import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import java.util.stream.Stream;
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private volatile Throwable aborted;
AbstractDataStoreClientBehavior(final ClientActorContext context,
- final BackendInfoResolver<ShardBackendInfo> resolver) {
+ final AbstractShardBackendResolver resolver) {
super(context, resolver);
singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
}
}
abstract Long resolveShardForPath(YangInstanceIdentifier path);
+
+ abstract Stream<Long> resolveAllShards();
+
+ final ActorUtils actorUtils() {
+ return ((AbstractShardBackendResolver) resolver()).actorUtils();
+ }
}
import com.google.common.util.concurrent.FluentFuture;
import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
- return ensureSnapshotProxy(path).exists(path);
+ return ensureProxy(path).exists(path);
}
public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
- return ensureSnapshotProxy(path).read(path);
+ return path.isEmpty() ? readRoot() : ensureProxy(path).read(path);
+ }
+
+ private FluentFuture<Optional<NormalizedNode>> readRoot() {
+ return RootScatterGather.gather(parent().actorUtils(), ensureAllProxies()
+ .map(proxy -> proxy.read(YangInstanceIdentifier.empty())));
}
@Override
final AbstractProxyTransaction createProxy(final Long shard) {
return parent().createSnapshotProxy(getIdentifier(), shard);
}
-
- private AbstractProxyTransaction ensureSnapshotProxy(final YangInstanceIdentifier path) {
- return ensureProxy(path);
- }
}
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
super(parent, transactionId);
}
- private AbstractProxyTransaction ensureTransactionProxy(final YangInstanceIdentifier path) {
- return ensureProxy(path);
- }
-
public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
- return ensureTransactionProxy(path).exists(path);
+ return ensureProxy(path).exists(path);
}
public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
- return ensureTransactionProxy(path).read(path);
+ return path.isEmpty() ? readRoot() : ensureProxy(path).read(path);
+ }
+
+ private FluentFuture<Optional<NormalizedNode>> readRoot() {
+ return RootScatterGather.gather(parent().actorUtils(), ensureAllProxies()
+ .map(proxy -> proxy.read(YangInstanceIdentifier.empty())));
}
public void delete(final YangInstanceIdentifier path) {
- ensureTransactionProxy(path).delete(path);
+ if (path.isEmpty()) {
+ ensureAllProxies().forEach(proxy -> proxy.delete(YangInstanceIdentifier.empty()));
+ } else {
+ ensureProxy(path).delete(path);
+ }
}
public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
- ensureTransactionProxy(path).merge(path, data);
+ if (path.isEmpty()) {
+ mergeRoot(RootScatterGather.castRootNode(data));
+ } else {
+ ensureProxy(path).merge(path, data);
+ }
+ }
+
+ private void mergeRoot(final @NonNull ContainerNode rootData) {
+ if (!rootData.isEmpty()) {
+ RootScatterGather.scatterTouched(rootData, this::ensureProxy).forEach(
+ scattered -> scattered.shard().merge(YangInstanceIdentifier.empty(), scattered.container()));
+ }
}
public void write(final YangInstanceIdentifier path, final NormalizedNode data) {
- ensureTransactionProxy(path).write(path, data);
+ if (path.isEmpty()) {
+ writeRoot(RootScatterGather.castRootNode(data));
+ } else {
+ ensureProxy(path).write(path, data);
+ }
+ }
+
+ private void writeRoot(final @NonNull ContainerNode rootData) {
+ RootScatterGather.scatterAll(rootData, this::ensureProxy, ensureAllProxies()).forEach(
+ scattered -> scattered.shard().write(YangInstanceIdentifier.empty(), scattered.container()));
+ }
+
+ private AbstractProxyTransaction ensureProxy(final PathArgument childId) {
+ return ensureProxy(YangInstanceIdentifier.create(childId));
}
public DOMStoreThreePhaseCommitCohort ready() {
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import java.util.function.Function;
+import java.util.stream.Stream;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
* @author Robert Varga
*/
final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
- private final Function<YangInstanceIdentifier, Long> pathToShard;
+ private final ModuleShardBackendResolver resolver;
private DistributedDataStoreClientBehavior(final ClientActorContext context,
final ModuleShardBackendResolver resolver) {
super(context, resolver);
- pathToShard = resolver::resolveShardForPath;
+ this.resolver = resolver;
}
DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorUtils actorUtils) {
@Override
Long resolveShardForPath(final YangInstanceIdentifier path) {
- return pathToShard.apply(path);
+ return resolver.resolveShardForPath(path);
+ }
+
+ @Override
+ Stream<Long> resolveAllShards() {
+ return resolver.resolveAllShards();
}
@Override
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
}
Long resolveShardForPath(final YangInstanceIdentifier path) {
- final String shardName = actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
+ return resolveCookie(actorUtils().getShardStrategyFactory().getStrategy(path).findShard(path));
+ }
+
+ Stream<Long> resolveAllShards() {
+ return actorUtils().getConfiguration().getAllShardNames().stream()
+ .sorted()
+ .map(this::resolveCookie);
+ }
+
+ private @NonNull Long resolveCookie(final String shardName) {
final Long cookie = shards.get(shardName);
return cookie != null ? cookie : populateShard(shardName);
}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import java.util.stream.Stream;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
*/
final class SimpleDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
// Pre-boxed instance
- private static final Long ZERO = Long.valueOf(0);
+ private static final Long ZERO = 0L;
private SimpleDataStoreClientBehavior(final ClientActorContext context,
final SimpleShardBackendResolver resolver) {
Long resolveShardForPath(final YangInstanceIdentifier path) {
return ZERO;
}
+
+ @Override
+ Stream<Long> resolveAllShards() {
+ return Stream.of(ZERO);
+ }
}
*/
package org.opendaylight.controller.cluster.datastore;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
import akka.actor.ActorSelection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
+import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
final SettableFuture<T> proxyFuture = SettableFuture.create();
- AbstractTransactionContextWrapper contextWrapper = getContextWrapper(shardName);
+ AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName);
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
}
private FluentFuture<Optional<NormalizedNode>> readAllData() {
- final Set<String> allShardNames = txContextFactory.getActorUtils().getConfiguration().getAllShardNames();
- final Collection<FluentFuture<Optional<NormalizedNode>>> futures = new ArrayList<>(allShardNames.size());
-
- for (String shardName : allShardNames) {
- futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty()));
- }
-
- final ListenableFuture<List<Optional<NormalizedNode>>> listFuture = Futures.allAsList(futures);
- final ListenableFuture<Optional<NormalizedNode>> aggregateFuture;
-
- aggregateFuture = Futures.transform(listFuture, input -> {
- try {
- return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input,
- txContextFactory.getActorUtils().getSchemaContext(),
- txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType());
- } catch (DataValidationFailedException e) {
- throw new IllegalArgumentException("Failed to aggregate", e);
- }
- }, MoreExecutors.directExecutor());
-
- return FluentFuture.from(aggregateFuture);
+ final var actorUtils = getActorUtils();
+ return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream()
+ .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.empty())));
}
@Override
private void deleteAllData() {
for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
- getContextWrapper(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
+ wrapperFromShardName(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
}
}
checkModificationState("merge", path);
if (path.isEmpty()) {
- mergeAllData(checkRootData(data));
+ mergeAllData(RootScatterGather.castRootNode(data));
} else {
executeModification(new MergeOperation(path, data));
}
}
private void mergeAllData(final ContainerNode rootData) {
- // Populate requests for individual shards that are being touched
- final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
- for (DataContainerChild child : rootData.body()) {
- final String shardName = shardNameFromRootChild(child);
- rootBuilders.computeIfAbsent(shardName,
- unused -> Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()))
- .addChild(child);
- }
-
- // Now dispatch all merges
- for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
- getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new MergeOperation(
- YangInstanceIdentifier.empty(), entry.getValue().build()));
+ if (!rootData.isEmpty()) {
+ RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach(
+ scattered -> scattered.shard().maybeExecuteTransactionOperation(
+ new MergeOperation(YangInstanceIdentifier.empty(), scattered.container())));
}
}
checkModificationState("write", path);
if (path.isEmpty()) {
- writeAllData(checkRootData(data));
+ writeAllData(RootScatterGather.castRootNode(data));
} else {
executeModification(new WriteOperation(path, data));
}
}
private void writeAllData(final ContainerNode rootData) {
- // Open builders for all shards
- final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
- for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
- rootBuilders.put(shardName, Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()));
- }
-
- // Now distribute children as needed
- for (DataContainerChild child : rootData.body()) {
- final String shardName = shardNameFromRootChild(child);
- verifyNotNull(rootBuilders.get(shardName), "Failed to find builder for %s", shardName).addChild(child);
- }
-
- // Now dispatch all writes
- for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
- getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new WriteOperation(
- YangInstanceIdentifier.empty(), entry.getValue().build()));
- }
+ RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild,
+ getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach(
+ scattered -> scattered.shard().maybeExecuteTransactionOperation(
+ new WriteOperation(YangInstanceIdentifier.empty(), scattered.container())));
}
private void executeModification(final TransactionModificationOperation operation) {
- getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation);
- }
-
- private static ContainerNode checkRootData(final NormalizedNode data) {
- // Root has to be a container
- checkArgument(data instanceof ContainerNode, "Invalid root data %s", data);
- return (ContainerNode) data;
+ wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation);
}
private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
}
- private String shardNameFromRootChild(final DataContainerChild child) {
- return shardNameFromIdentifier(YangInstanceIdentifier.create(child.getIdentifier()));
- }
-
private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
}
- private AbstractTransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
- return getContextWrapper(shardNameFromIdentifier(path));
+ private AbstractTransactionContextWrapper wrapperFromRootChild(final PathArgument childId) {
+ return wrapperFromShardName(shardNameFromIdentifier(YangInstanceIdentifier.create(childId)));
}
- private AbstractTransactionContextWrapper getContextWrapper(final String shardName) {
+ private AbstractTransactionContextWrapper wrapperFromShardName(final String shardName) {
final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
if (existing != null) {
return existing;
--- /dev/null
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+/**
+ * Utility methods for dealing with datastore root {@link ContainerNode} with respect to module shards.
+ */
+public final class RootScatterGather {
+ // FIXME: Record when we have JDK17+
+ @NonNullByDefault
+ public static final class ShardContainer<T> {
+ private final ContainerNode container;
+ private final T shard;
+
+ ShardContainer(final T shard, final ContainerNode container) {
+ this.shard = requireNonNull(shard);
+ this.container = requireNonNull(container);
+ }
+
+ public T shard() {
+ return shard;
+ }
+
+ public ContainerNode container() {
+ return container;
+ }
+
+ @Override
+ public int hashCode() {
+ return shard.hashCode();
+ }
+
+ @Override
+ public boolean equals(final @Nullable Object obj) {
+ return obj == this || obj instanceof ShardContainer && shard.equals(((ShardContainer<?>) obj).shard);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("shard", shard).toString();
+ }
+ }
+
+ private RootScatterGather() {
+ // Hidden on purpose
+ }
+
+ /**
+ * Check whether a {@link NormalizedNode} represents a root container and return it cast to {@link ContainerNode}.
+ *
+ * @param node a normalized node
+ * @return {@code node} cast to ContainerNode
+ * @throws NullPointerException if {@code node} is null
+ * @throws IllegalArgumentException if {@code node} is not a {@link ContainerNode}
+ */
+ public static @NonNull ContainerNode castRootNode(final NormalizedNode node) {
+ final var nonnull = requireNonNull(node);
+ checkArgument(nonnull instanceof ContainerNode, "Invalid root data %s", nonnull);
+ return (ContainerNode) nonnull;
+ }
+
+ /**
+ * Reconstruct root container from a set of constituents.
+ *
+ * @param actorUtils {@link ActorUtils} reference
+ * @param readFutures Consitutent read futures
+ * @return A composite future
+ */
+ public static @NonNull FluentFuture<Optional<NormalizedNode>> gather(final ActorUtils actorUtils,
+ final Stream<FluentFuture<Optional<NormalizedNode>>> readFutures) {
+ return FluentFuture.from(Futures.transform(
+ Futures.allAsList(readFutures.collect(ImmutableList.toImmutableList())), input -> {
+ try {
+ return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input,
+ actorUtils.getSchemaContext(), actorUtils.getDatastoreContext().getLogicalStoreType());
+ } catch (DataValidationFailedException e) {
+ throw new IllegalArgumentException("Failed to aggregate", e);
+ }
+ }, MoreExecutors.directExecutor()));
+ }
+
+ public static <T> @NonNull Stream<ShardContainer<T>> scatterAll(final ContainerNode rootNode,
+ final Function<PathArgument, T> childToShard, final Stream<T> allShards) {
+ final var builders = allShards
+ .collect(Collectors.toUnmodifiableMap(Function.identity(), unused -> Builders.containerBuilder()));
+ for (var child : rootNode.body()) {
+ final var shard = childToShard.apply(child.getIdentifier());
+ verifyNotNull(builders.get(shard), "Failed to find builder for %s", shard).addChild(child);
+ }
+ return streamContainers(rootNode.getIdentifier(), builders);
+ }
+
+ /**
+ * Split root container into per-shard root containers.
+ *
+ * @param <T> Shard reference type
+ * @param rootNode Root container to be split up
+ * @param childToShard Mapping function from child {@link PathArgument} to shard reference
+ * @return Stream of {@link ShardContainer}s, one for each touched shard
+ */
+ public static <T> @NonNull Stream<ShardContainer<T>> scatterTouched(final ContainerNode rootNode,
+ final Function<PathArgument, T> childToShard) {
+ final var builders = new HashMap<T, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>>();
+ for (var child : rootNode.body()) {
+ builders.computeIfAbsent(childToShard.apply(child.getIdentifier()), unused -> Builders.containerBuilder())
+ .addChild(child);
+ }
+ return streamContainers(rootNode.getIdentifier(), builders);
+ }
+
+ private static <T> @NonNull Stream<ShardContainer<T>> streamContainers(final NodeIdentifier rootId,
+ final Map<T, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> builders) {
+ return builders.entrySet().stream()
+ .map(entry -> new ShardContainer<>(entry.getKey(), entry.getValue().withNodeIdentifier(rootId).build()));
+ }
+}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import scala.concurrent.Promise;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
final InternalCommand<ShardBackendInfo> command = clientContextProbe.expectMsgClass(InternalCommand.class);
command.execute(client);
//data tree mock
- when(dataTree.takeSnapshot()).thenReturn(dataTreeSnapshot);
+ doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
handle = createHandle(parent);
}
final ActorSelection selection = system.actorSelection(actor.path());
final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
promise.success(shardInfo);
- when(mock.findPrimaryShardAsync(any())).thenReturn(promise.future());
+ doReturn(promise.future()).when(mock).findPrimaryShardAsync(any());
+
+ final EffectiveModelContext context = mock(EffectiveModelContext.class);
+ lenient().doCallRealMethod().when(context).getQName();
+ lenient().doReturn(context).when(mock).getSchemaContext();
+ lenient().doReturn(DatastoreContext.newBuilder().build()).when(mock).getDatastoreContext();
+
return mock;
}
-
}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
import akka.actor.ActorRef;
public void testGetConnection() {
//set up data tree mock
final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
- when(modification.readNode(YangInstanceIdentifier.empty())).thenReturn(Optional.empty());
+ doReturn(Optional.empty()).when(modification).readNode(YangInstanceIdentifier.empty());
final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
- when(snapshot.newModification()).thenReturn(modification);
+ doReturn(modification).when(snapshot).newModification();
final DataTree dataTree = mock(DataTree.class);
- when(dataTree.takeSnapshot()).thenReturn(snapshot);
+ doReturn(snapshot).when(dataTree).takeSnapshot();
final TestProbe backendProbe = new TestProbe(system, "backend");
final long shard = 0L;
+
behavior.createTransaction().read(YangInstanceIdentifier.empty());
final AbstractClientConnection<ShardBackendInfo> connection = behavior.getConnection(shard);
//check cached connection for same shard
final ActorSelection selection = system.actorSelection(actor.path());
final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
promise.success(shardInfo);
- when(mock.findPrimaryShardAsync(SHARD)).thenReturn(promise.future());
+ doReturn(promise.future()).when(mock).findPrimaryShardAsync(SHARD);
return mock;
}
-
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class ClientSnapshotTest extends AbstractClientHandleTest<ClientSnapshot> {
-
private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
@Before
@Override
public void setUp() throws Exception {
super.setUp();
- when(getDataTreeSnapshot().readNode(PATH)).thenReturn(Optional.empty());
+ doReturn(Optional.empty()).when(getDataTreeSnapshot()).readNode(PATH);
}
@Override
@Test
public void testExists() throws Exception {
- final ListenableFuture<Boolean> exists = getHandle().exists(PATH);
+ final var exists = getHandle().exists(PATH);
verify(getDataTreeSnapshot()).readNode(PATH);
assertEquals(Boolean.FALSE, getWithTimeout(exists));
}
@Test
public void testRead() throws Exception {
- final ListenableFuture<Optional<NormalizedNode>> exists = getHandle().read(PATH);
+ final var read = getHandle().read(PATH);
verify(getDataTreeSnapshot()).readNode(PATH);
- assertFalse(getWithTimeout(exists).isPresent());
+ assertFalse(getWithTimeout(read).isPresent());
}
}
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
.node(QName.create("ns-1", "node-1"))
.build();
private static final ContainerNode DATA = Builders.containerBuilder()
- .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(PATH.getLastPathArgument().getNodeType()))
+ .withNodeIdentifier(NodeIdentifier.create(PATH.getLastPathArgument().getNodeType()))
.build();
@Mock
package org.opendaylight.controller.cluster.databroker.actors.dds;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import java.util.Set;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
@Override
protected AbstractDataStoreClientBehavior createBehavior(final ClientActorContext clientContext,
final ActorUtils context) {
- final ShardStrategyFactory factory = mock(ShardStrategyFactory.class);
final ShardStrategy strategy = mock(ShardStrategy.class);
- when(strategy.findShard(any())).thenReturn(SHARD);
- when(factory.getStrategy(any())).thenReturn(strategy);
- when(context.getShardStrategyFactory()).thenReturn(factory);
+ doReturn(SHARD).when(strategy).findShard(any());
+ final ShardStrategyFactory factory = mock(ShardStrategyFactory.class);
+ doReturn(strategy).when(factory).getStrategy(any());
+ doReturn(factory).when(context).getShardStrategyFactory();
+
+ final Configuration config = mock(Configuration.class);
+ doReturn(Set.of(SHARD)).when(config).getAllShardNames();
+ doReturn(config).when(context).getConfiguration();
+
return new DistributedDataStoreClientBehavior(clientContext, context);
}
}