X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=43bf09eb8aceee72089911717bd8d6b5e6c2c556;hb=0597bd64ca6214fd3ca3fb152ab2ca7fd3b32ed5;hp=98e115efc6270342e4f54ce99f5dcf0e95d99bc8;hpb=5b69c8e66b12a29a36457955cac4a45affd7c73f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 98e115efc6..43bf09eb8a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -14,17 +14,12 @@ import akka.actor.ActorSelection; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FluentFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -36,12 +31,13 @@ import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; +import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather; import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -59,8 +55,9 @@ public class TransactionProxy extends AbstractDOMStoreTransaction txContextWrappers = new TreeMap<>(); + private final Map txContextWrappers = new TreeMap<>(); private final AbstractTransactionContextFactory txContextFactory; private final TransactionType type; private TransactionState state = TransactionState.OPEN; @@ -86,7 +83,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction proxyFuture = SettableFuture.create(); - TransactionContextWrapper contextWrapper = getContextWrapper(shardName); + AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName); contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(final TransactionContext transactionContext, final Boolean havePermit) { @@ -98,7 +95,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>> read(final YangInstanceIdentifier path) { + public FluentFuture> read(final YangInstanceIdentifier path) { checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); requireNonNull(path, "path should not be null"); @@ -106,58 +103,73 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>> singleShardRead( - final String shardName, final YangInstanceIdentifier path) { + private FluentFuture> singleShardRead(final String shardName, + final YangInstanceIdentifier path) { return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION)); } - private FluentFuture>> readAllData() { - final Set allShardNames = txContextFactory.getActorUtils().getConfiguration().getAllShardNames(); - final Collection>>> futures = new ArrayList<>(allShardNames.size()); - - for (String shardName : allShardNames) { - futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty())); - } - - final ListenableFuture>>> listFuture = Futures.allAsList(futures); - final ListenableFuture>> aggregateFuture; - - aggregateFuture = Futures.transform(listFuture, input -> { - try { - return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input, - txContextFactory.getActorUtils().getSchemaContext(), - txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType()); - } catch (DataValidationFailedException e) { - throw new IllegalArgumentException("Failed to aggregate", e); - } - }, MoreExecutors.directExecutor()); - - return FluentFuture.from(aggregateFuture); + private FluentFuture> readAllData() { + final var actorUtils = getActorUtils(); + return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream() + .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.empty()))); } @Override public void delete(final YangInstanceIdentifier path) { checkModificationState("delete", path); - executeModification(new DeleteOperation(path)); + if (path.isEmpty()) { + deleteAllData(); + } else { + executeModification(new DeleteOperation(path)); + } + } + + private void deleteAllData() { + for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) { + wrapperFromShardName(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION); + } } @Override - public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { checkModificationState("merge", path); - executeModification(new MergeOperation(path, data)); + if (path.isEmpty()) { + mergeAllData(RootScatterGather.castRootNode(data)); + } else { + executeModification(new MergeOperation(path, data)); + } + } + + private void mergeAllData(final ContainerNode rootData) { + if (!rootData.isEmpty()) { + RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach( + scattered -> scattered.shard().maybeExecuteTransactionOperation( + new MergeOperation(YangInstanceIdentifier.empty(), scattered.container()))); + } } @Override - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { checkModificationState("write", path); - executeModification(new WriteOperation(path, data)); + if (path.isEmpty()) { + writeAllData(RootScatterGather.castRootNode(data)); + } else { + executeModification(new WriteOperation(path, data)); + } + } + + private void writeAllData(final ContainerNode rootData) { + RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild, + getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach( + scattered -> scattered.shard().maybeExecuteTransactionOperation( + new WriteOperation(YangInstanceIdentifier.empty(), scattered.container()))); } private void executeModification(final TransactionModificationOperation operation) { - getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation); + wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation); } private void checkModificationState(final String opName, final YangInstanceIdentifier path) { @@ -183,7 +195,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction e = Iterables.getOnlyElement( + final Entry e = Iterables.getOnlyElement( txContextWrappers.entrySet()); ret = createSingleCommitCohort(e.getKey(), e.getValue()); break; @@ -227,7 +239,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort(final String shardName, - final TransactionContextWrapper contextWrapper) { + final AbstractTransactionContextWrapper contextWrapper) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); @@ -268,10 +280,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction cohorts = new ArrayList<>(txContextWrappers.size()); final Optional> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet())); - for (Entry e : txContextWrappers.entrySet()) { + for (Entry e : txContextWrappers.entrySet()) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - final TransactionContextWrapper wrapper = e.getValue(); + final AbstractTransactionContextWrapper wrapper = e.getValue(); // The remote tx version is obtained the via TransactionContext which may not be available yet so // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the @@ -284,20 +296,20 @@ public class TransactionProxy extends AbstractDOMStoreTransaction