X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=5081c9b4f8f652e55e5241733051bc8064aa96e2;hb=b0c84fd023315b49856be0df1522f9785a8dd899;hp=a0987cd5d63943979985527794dbe8471c0afe78;hpb=f89552de4942d3709d6ee84415e672c6c7de489f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index a0987cd5d6..5081c9b4f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; @@ -40,6 +41,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,23 +62,6 @@ import scala.concurrent.Promise; */ public class TransactionProxy extends AbstractDOMStoreTransaction implements DOMStoreReadWriteTransaction { - public static enum TransactionType { - READ_ONLY, - WRITE_ONLY, - READ_WRITE; - - // Cache all values - private static final TransactionType[] VALUES = values(); - - public static TransactionType fromInt(final int type) { - try { - return VALUES[type]; - } catch (IndexOutOfBoundsException e) { - throw new IllegalArgumentException("In TransactionType enum value " + type, e); - } - } - } - private static enum TransactionState { OPEN, READY, @@ -112,7 +97,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction ready() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); @@ -370,11 +353,56 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort() { + TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next(); + + LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), + txFutureCallback.getShardName(), getTransactionChainId()); + + final OperationCallback.Reference operationCallbackRef = + new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK); + final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + final Future future; + if (transactionContext != null) { + // avoid the creation of a promise and a TransactionOperation + future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); + } else { + final Promise promise = akka.dispatch.Futures.promise(); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef)); + } + }); + future = promise.future(); + } + + return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef); + } + + private Future getReadyOrDirectCommitFuture(TransactionContext transactionContext, + OperationCallback.Reference operationCallbackRef) { + if(transactionContext.supportsDirectCommit()) { + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext); + operationCallbackRef.set(rateLimitingCallback); + rateLimitingCallback.run(); + return transactionContext.directCommit(); + } else { + return transactionContext.readyTransaction(); + } + } + + private AbstractThreePhaseCommitCohort createMultiCommitCohort() { List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); + LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), + txFutureCallback.getShardName(), getTransactionChainId()); final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); final Future future; @@ -395,8 +423,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction sendFindPrimaryShardAsync(String shardName) { + protected Future sendFindPrimaryShardAsync(String shardName) { return actorContext.findPrimaryShardAsync(shardName); } @@ -452,20 +479,20 @@ public class TransactionProxy extends AbstractDOMStoreTransaction findPrimaryFuture = sendFindPrimaryShardAsync(shardName); + Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); - findPrimaryFuture.onComplete(new OnComplete() { + findPrimaryFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { + public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { if(failure != null) { newTxFutureCallback.createTransactionContext(failure, null); } else { - newTxFutureCallback.setPrimaryShard(primaryShard); + newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor()); } } }, actorContext.getClientDispatcher()); @@ -475,7 +502,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction