X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextWrapper.java;h=ef9ee68bf016df46e5dfcd1711d42adcc20f4664;hb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4;hp=26d7ff8b02bf7beef1e8637fd513af26318777b4;hpb=c7e1ddeaf842ebb696c8dd38c0ca14c925ee31a1;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index 26d7ff8b02..ef9ee68bf0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -7,14 +7,21 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; /** * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target @@ -32,15 +39,20 @@ class TransactionContextWrapper { @GuardedBy("queuedTxOperations") private final List queuedTxOperations = Lists.newArrayList(); + private final TransactionIdentifier identifier; + /** * The resulting TransactionContext. */ private volatile TransactionContext transactionContext; - private final TransactionIdentifier identifier; + private final OperationLimiter limiter; - TransactionContextWrapper(final TransactionIdentifier identifier) { - this.identifier = identifier; + TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext) { + this.identifier = Preconditions.checkNotNull(identifier); + this.limiter = new OperationLimiter(identifier, + actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation + TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis())); } TransactionContext getTransactionContext() { @@ -69,6 +81,8 @@ class TransactionContextWrapper { if (invokeOperation) { operation.invoke(transactionContext); + } else { + limiter.acquire(); } } @@ -95,10 +109,14 @@ class TransactionContextWrapper { // queued (eg a put operation from a client read Future callback that is notified // synchronously). Collection operationsBatch = null; - synchronized(queuedTxOperations) { - if(queuedTxOperations.isEmpty()) { + synchronized (queuedTxOperations) { + if (queuedTxOperations.isEmpty()) { // We're done invoking the TransactionOperations so we can now publish the // TransactionContext. + localTransactionContext.operationHandOffComplete(); + if(!localTransactionContext.usesOperationLimiting()){ + limiter.releaseAll(); + } transactionContext = localTransactionContext; break; } @@ -110,9 +128,32 @@ class TransactionContextWrapper { // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. // A slight down-side is that we need to re-acquire the lock below but this should // be negligible. - for(TransactionOperation oper: operationsBatch) { + for (TransactionOperation oper : operationsBatch) { oper.invoke(localTransactionContext); } } } + + Future readyTransaction() { + // avoid the creation of a promise and a TransactionOperation + if (transactionContext != null) { + return transactionContext.readyTransaction(); + } + + final Promise promise = Futures.promise(); + enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(transactionContext.readyTransaction()); + } + }); + + return promise.future(); + } + + public OperationLimiter getLimiter() { + return limiter; + } + + }