X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextWrapper.java;h=3fb129f3817f75b7388a335458e4b67b4c8aca09;hp=ea4e5139425f5eee21151a0cc759910d77b8f6cc;hb=b00bee7547dbba0677347e991a8674f90752f6a2;hpb=abfa9a03550cbe9fccc4420684dced175dd6d119 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index ea4e513942..3fb129f381 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -7,14 +7,21 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; /** * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target @@ -32,15 +39,21 @@ class TransactionContextWrapper { @GuardedBy("queuedTxOperations") private final List queuedTxOperations = Lists.newArrayList(); + private final TransactionIdentifier identifier; + /** * The resulting TransactionContext. */ private volatile TransactionContext transactionContext; - private final TransactionIdentifier identifier; + private final OperationLimiter limiter; - TransactionContextWrapper(final TransactionIdentifier identifier) { - this.identifier = identifier; + TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext) { + this.identifier = Preconditions.checkNotNull(identifier); + this.limiter = new OperationLimiter(identifier, + // 1 extra permit for the ready operation + actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, + TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis())); } TransactionContext getTransactionContext() { @@ -56,7 +69,7 @@ class TransactionContextWrapper { */ private void enqueueTransactionOperation(final TransactionOperation operation) { final boolean invokeOperation; - synchronized(queuedTxOperations) { + synchronized (queuedTxOperations) { if (transactionContext == null) { LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier()); @@ -69,6 +82,8 @@ class TransactionContextWrapper { if (invokeOperation) { operation.invoke(transactionContext); + } else { + limiter.acquire(); } } @@ -84,7 +99,7 @@ class TransactionContextWrapper { } void executePriorTransactionOperations(final TransactionContext localTransactionContext) { - while(true) { + while (true) { // Access to queuedTxOperations and transactionContext must be protected and atomic // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing // issues and ensure no TransactionOperation is missed and that they are processed @@ -94,11 +109,15 @@ class TransactionContextWrapper { // in case a TransactionOperation results in another transaction operation being // queued (eg a put operation from a client read Future callback that is notified // synchronously). - Collection operationsBatch = null; - synchronized(queuedTxOperations) { - if(queuedTxOperations.isEmpty()) { + final Collection operationsBatch; + synchronized (queuedTxOperations) { + if (queuedTxOperations.isEmpty()) { // We're done invoking the TransactionOperations so we can now publish the // TransactionContext. + localTransactionContext.operationHandOffComplete(); + if (!localTransactionContext.usesOperationLimiting()) { + limiter.releaseAll(); + } transactionContext = localTransactionContext; break; } @@ -110,9 +129,32 @@ class TransactionContextWrapper { // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. // A slight down-side is that we need to re-acquire the lock below but this should // be negligible. - for(TransactionOperation oper: operationsBatch) { + for (TransactionOperation oper : operationsBatch) { oper.invoke(localTransactionContext); } } } + + Future readyTransaction() { + // avoid the creation of a promise and a TransactionOperation + if (transactionContext != null) { + return transactionContext.readyTransaction(); + } + + final Promise promise = Futures.promise(); + enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext newTransactionContext) { + promise.completeWith(newTransactionContext.readyTransaction()); + } + }); + + return promise.future(); + } + + public OperationLimiter getLimiter() { + return limiter; + } + + }