X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=b03398093c9ddcd838b8121b42f94a026acf4e4c;hb=8b81403fe18c40e3efe47a2147844b0fea1b23ff;hp=f645608dd9b96c327153c804f544a0b2eacb16b3;hpb=e448e4e5f1f071aa61152b2f49b239d878c0a580;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f645608dd9..b03398093c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -12,6 +12,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; @@ -237,6 +238,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort(final String shardName, final TransactionContextWrapper contextWrapper) { @@ -252,43 +254,52 @@ public class TransactionProxy extends AbstractDOMStoreTransaction getReadyOrDirectCommitFuture(TransactionContext transactionContext, + private Future getDirectCommitFuture(TransactionContext transactionContext, OperationCallback.Reference operationCallbackRef) { - if (transactionContext.supportsDirectCommit()) { - TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback( - txContextFactory.getActorContext()); - operationCallbackRef.set(rateLimitingCallback); - rateLimitingCallback.run(); - return transactionContext.directCommit(); - } else { - return transactionContext.readyTransaction(); - } + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback( + txContextFactory.getActorContext()); + operationCallbackRef.set(rateLimitingCallback); + rateLimitingCallback.run(); + return transactionContext.directCommit(); } private AbstractThreePhaseCommitCohort createMultiCommitCohort( final Set> txContextWrapperEntries) { - final List> cohortFutures = new ArrayList<>(txContextWrapperEntries.size()); + final List cohorts = new ArrayList<>(txContextWrapperEntries.size()); for (Entry e : txContextWrapperEntries) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - cohortFutures.add(e.getValue().readyTransaction()); + final TransactionContextWrapper wrapper = e.getValue(); + + // The remote tx version is obtained the via TransactionContext which may not be available yet so + // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the + // TransactionContext is available. + Supplier txVersionSupplier = new Supplier() { + @Override + public Short get() { + return wrapper.getTransactionContext().getTransactionVersion(); + } + }; + + cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier)); } - return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString()); + return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, + getIdentifier().toString()); } private String shardNameFromIdentifier(final YangInstanceIdentifier path) {