X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=98e115efc6270342e4f54ce99f5dcf0e95d99bc8;hp=155bb1fd3e303cfe8aa929fb44322bad90e2e949;hb=5b69c8e66b12a29a36457955cac4a45affd7c73f;hpb=3859df9beca8f13f1ff2b2744ed3470a1715bec3 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 155bb1fd3e..98e115efc6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -12,8 +12,6 @@ import static java.util.Objects.requireNonNull; import akka.actor.ActorSelection; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.Futures; @@ -31,13 +29,12 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation; +import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation; +import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; -import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; -import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction; @@ -106,7 +103,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>> singleShardRead( @@ -119,68 +116,62 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>>> futures = new ArrayList<>(allShardNames.size()); for (String shardName : allShardNames) { - futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY)); + futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty())); } final ListenableFuture>>> listFuture = Futures.allAsList(futures); final ListenableFuture>> aggregateFuture; - aggregateFuture = Futures.transform(listFuture, - (Function>>, Optional>>) input -> { - try { - return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input, - txContextFactory.getActorUtils().getSchemaContext(), - txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType()); - } catch (DataValidationFailedException e) { - throw new IllegalArgumentException("Failed to aggregate", e); - } - }, MoreExecutors.directExecutor()); + aggregateFuture = Futures.transform(listFuture, input -> { + try { + return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input, + txContextFactory.getActorUtils().getSchemaContext(), + txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType()); + } catch (DataValidationFailedException e) { + throw new IllegalArgumentException("Failed to aggregate", e); + } + }, MoreExecutors.directExecutor()); return FluentFuture.from(aggregateFuture); } @Override public void delete(final YangInstanceIdentifier path) { - executeModification(new DeleteModification(path)); + checkModificationState("delete", path); + + executeModification(new DeleteOperation(path)); } @Override public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - executeModification(new MergeModification(path, data)); + checkModificationState("merge", path); + + executeModification(new MergeOperation(path, data)); } @Override public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - executeModification(new WriteModification(path, data)); - } - - private void executeModification(final AbstractModification modification) { - checkModificationState(); + checkModificationState("write", path); - LOG.trace("Tx {} executeModification {} {}", getIdentifier(), modification.getClass().getSimpleName(), - modification.getPath()); + executeModification(new WriteOperation(path, data)); + } - TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath()); - contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { - @Override - protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) { - transactionContext.executeModification(modification, havePermit); - } - }); + private void executeModification(final TransactionModificationOperation operation) { + getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation); } - private void checkModificationState() { + private void checkModificationState(final String opName, final YangInstanceIdentifier path) { checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed"); checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed"); + LOG.trace("Tx {} {} {}", getIdentifier(), opName, path); } private boolean seal(final TransactionState newState) { if (state == TransactionState.OPEN) { state = newState; return true; - } else { - return false; } + return false; } @Override @@ -285,10 +276,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion(); - cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames), - txVersionSupplier)); + () -> wrapper.getTransactionContext().getTransactionVersion())); } return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());