X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextWrapper.java;h=b08d4192b48c1ddd7b3cf3162782b68ef129c855;hb=9e7a9b3725ad25f9adf85f0ad796b7cf748795a4;hp=26d7ff8b02bf7beef1e8637fd513af26318777b4;hpb=b2af021ee27b2977961f0fec6f8bb1a4acbcdbd7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index 26d7ff8b02..b08d4192b4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -7,6 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collection; @@ -15,6 +18,8 @@ import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; /** * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target @@ -37,10 +42,10 @@ class TransactionContextWrapper { */ private volatile TransactionContext transactionContext; - private final TransactionIdentifier identifier; + private final OperationLimiter limiter; - TransactionContextWrapper(final TransactionIdentifier identifier) { - this.identifier = identifier; + TransactionContextWrapper(final OperationLimiter limiter) { + this.limiter = Preconditions.checkNotNull(limiter); } TransactionContext getTransactionContext() { @@ -48,7 +53,7 @@ class TransactionContextWrapper { } TransactionIdentifier getIdentifier() { - return identifier; + return limiter.getIdentifier(); } /** @@ -69,6 +74,8 @@ class TransactionContextWrapper { if (invokeOperation) { operation.invoke(transactionContext); + } else { + limiter.acquire(); } } @@ -95,10 +102,11 @@ class TransactionContextWrapper { // queued (eg a put operation from a client read Future callback that is notified // synchronously). Collection operationsBatch = null; - synchronized(queuedTxOperations) { - if(queuedTxOperations.isEmpty()) { + synchronized (queuedTxOperations) { + if (queuedTxOperations.isEmpty()) { // We're done invoking the TransactionOperations so we can now publish the // TransactionContext. + localTransactionContext.operationHandoffComplete(); transactionContext = localTransactionContext; break; } @@ -110,9 +118,26 @@ class TransactionContextWrapper { // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. // A slight down-side is that we need to re-acquire the lock below but this should // be negligible. - for(TransactionOperation oper: operationsBatch) { + for (TransactionOperation oper : operationsBatch) { oper.invoke(localTransactionContext); } } } + + Future readyTransaction() { + // avoid the creation of a promise and a TransactionOperation + if (transactionContext != null) { + return transactionContext.readyTransaction(); + } + + final Promise promise = Futures.promise(); + enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(transactionContext.readyTransaction()); + } + }); + + return promise.future(); + } }