X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextWrapper.java;h=0e1260962d37d807f53233729f3f999e996faed9;hb=dc295d9be77748d7e695d003a02d299d493abc8d;hp=a126ce95971bae232c2da0b1f9fb9aa3c550cfe6;hpb=6744682fb2b843b23449e5410d2098ecc9e4b62f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index a126ce9597..0e1260962d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -10,10 +10,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.Futures; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -37,16 +38,19 @@ class TransactionContextWrapper { * The list of transaction operations to execute once the TransactionContext becomes available. */ @GuardedBy("queuedTxOperations") - private final List queuedTxOperations = Lists.newArrayList(); + private final List> queuedTxOperations = new ArrayList<>(); private final TransactionIdentifier identifier; + private final OperationLimiter limiter; private final String shardName; /** * The resulting TransactionContext. */ private volatile TransactionContext transactionContext; - - private final OperationLimiter limiter; + @GuardedBy("queuedTxOperations") + private TransactionContext deferredTransactionContext; + @GuardedBy("queuedTxOperations") + private boolean pendingEnqueue; TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext, final String shardName) { @@ -67,35 +71,77 @@ class TransactionContextWrapper { } /** - * Adds a TransactionOperation to be executed once the TransactionContext becomes available. + * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called + * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the + * context is not available. */ private void enqueueTransactionOperation(final TransactionOperation operation) { - final boolean invokeOperation; + // We have three things to do here: + // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained + // - acquire a permit for the operation if we still need to enqueue it + // - enqueue the operation + // + // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the + // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further + // complications are: + // - this method may be called from the thread invoking executePriorTransactionOperations() + // - user may be violating API contract of using the transaction from a single thread + + // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have + // the lock, we will assert that we will be enqueing another operation. + final TransactionContext contextOnEntry; synchronized (queuedTxOperations) { - if (transactionContext == null) { - LOG.debug("Tx {} Queuing TransactionOperation", identifier); - - queuedTxOperations.add(operation); - invokeOperation = false; - } else { - invokeOperation = true; + contextOnEntry = transactionContext; + if (contextOnEntry == null) { + Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", + identifier); + pendingEnqueue = true; } } - if (invokeOperation) { - operation.invoke(transactionContext); - } else { - if (!limiter.acquire()) { + // Short-circuit if there is a context + if (contextOnEntry != null) { + operation.invoke(transactionContext, null); + return; + } + + boolean cleanupEnqueue = true; + TransactionContext finishHandoff = null; + try { + // Acquire the permit, + final boolean havePermit = limiter.acquire(); + if (!havePermit) { LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier, shardName); } + + // Ready to enqueue, take the lock again and append the operation + synchronized (queuedTxOperations) { + LOG.debug("Tx {} Queuing TransactionOperation", identifier); + queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit)); + pendingEnqueue = false; + cleanupEnqueue = false; + finishHandoff = deferredTransactionContext; + deferredTransactionContext = null; + } + } finally { + if (cleanupEnqueue) { + synchronized (queuedTxOperations) { + pendingEnqueue = false; + finishHandoff = deferredTransactionContext; + deferredTransactionContext = null; + } + } + if (finishHandoff != null) { + executePriorTransactionOperations(finishHandoff); + } } } void maybeExecuteTransactionOperation(final TransactionOperation op) { - - if (transactionContext != null) { - op.invoke(transactionContext); + final TransactionContext localContext = transactionContext; + if (localContext != null) { + op.invoke(localContext, null); } else { // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future // callback to be executed after the Tx is created. @@ -114,17 +160,23 @@ class TransactionContextWrapper { // in case a TransactionOperation results in another transaction operation being // queued (eg a put operation from a client read Future callback that is notified // synchronously). - final Collection operationsBatch; + final Collection> operationsBatch; synchronized (queuedTxOperations) { if (queuedTxOperations.isEmpty()) { - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - localTransactionContext.operationHandOffComplete(); - if (!localTransactionContext.usesOperationLimiting()) { - limiter.releaseAll(); + if (!pendingEnqueue) { + // We're done invoking the TransactionOperations so we can now publish the TransactionContext. + localTransactionContext.operationHandOffComplete(); + if (!localTransactionContext.usesOperationLimiting()) { + limiter.releaseAll(); + } + + // This is null-to-non-null transition after which we are releasing the lock and not doing + // any further processing. + transactionContext = localTransactionContext; + } else { + deferredTransactionContext = localTransactionContext; } - transactionContext = localTransactionContext; - break; + return; } operationsBatch = new ArrayList<>(queuedTxOperations); @@ -134,32 +186,31 @@ class TransactionContextWrapper { // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. // A slight down-side is that we need to re-acquire the lock below but this should // be negligible. - for (TransactionOperation oper : operationsBatch) { - oper.invoke(localTransactionContext); + for (Entry oper : operationsBatch) { + oper.getKey().invoke(localTransactionContext, oper.getValue()); } } } Future readyTransaction() { // avoid the creation of a promise and a TransactionOperation - if (transactionContext != null) { - return transactionContext.readyTransaction(); + final TransactionContext localContext = transactionContext; + if (localContext != null) { + return localContext.readyTransaction(null); } final Promise promise = Futures.promise(); enqueueTransactionOperation(new TransactionOperation() { @Override - public void invoke(TransactionContext newTransactionContext) { - promise.completeWith(newTransactionContext.readyTransaction()); + public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { + promise.completeWith(newTransactionContext.readyTransaction(havePermit)); } }); return promise.future(); } - public OperationLimiter getLimiter() { + OperationLimiter getLimiter() { return limiter; } - - }