X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=e397ab501c064adf98c5ee6c2f6708f05eb6f2fe;hb=e8746b7ae6620d9e0dc159f2a13d3385d6197c56;hp=a0987cd5d63943979985527794dbe8471c0afe78;hpb=f89552de4942d3709d6ee84415e672c6c7de489f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index a0987cd5d6..e397ab501c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; @@ -40,6 +41,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,7 +160,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction ready() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); @@ -370,10 +372,55 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort() { + TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next(); + + LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), + txFutureCallback.getShardName(), transactionChainId); + + final OperationCallback.Reference operationCallbackRef = + new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK); + final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + final Future future; + if (transactionContext != null) { + // avoid the creation of a promise and a TransactionOperation + future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); + } else { + final Promise promise = akka.dispatch.Futures.promise(); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef)); + } + }); + future = promise.future(); + } + + return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef); + } + + private Future getReadyOrDirectCommitFuture(TransactionContext transactionContext, + OperationCallback.Reference operationCallbackRef) { + if(transactionContext.supportsDirectCommit()) { + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext); + operationCallbackRef.set(rateLimitingCallback); + rateLimitingCallback.run(); + return transactionContext.directCommit(); + } else { + return transactionContext.readyTransaction(); + } + } + + private AbstractThreePhaseCommitCohort createMultiCommitCohort() { List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(), + LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), txFutureCallback.getShardName(), transactionChainId); final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); @@ -395,8 +442,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction sendFindPrimaryShardAsync(String shardName) { + protected Future sendFindPrimaryShardAsync(String shardName) { return actorContext.findPrimaryShardAsync(shardName); } @@ -452,20 +498,20 @@ public class TransactionProxy extends AbstractDOMStoreTransaction findPrimaryFuture = sendFindPrimaryShardAsync(shardName); + Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); - findPrimaryFuture.onComplete(new OnComplete() { + findPrimaryFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { + public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { if(failure != null) { newTxFutureCallback.createTransactionContext(failure, null); } else { - newTxFutureCallback.setPrimaryShard(primaryShard); + newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor()); } } }, actorContext.getClientDispatcher());