Fix TransactionContextWrapper limiter accounting
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
index a126ce95971bae232c2da0b1f9fb9aa3c550cfe6..0e1260962d37d807f53233729f3f999e996faed9 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -37,16 +38,19 @@ class TransactionContextWrapper {
      * The list of transaction operations to execute once the TransactionContext becomes available.
      */
     @GuardedBy("queuedTxOperations")
-    private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+    private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
     private final TransactionIdentifier identifier;
+    private final OperationLimiter limiter;
     private final String shardName;
 
     /**
      * The resulting TransactionContext.
      */
     private volatile TransactionContext transactionContext;
-
-    private final OperationLimiter limiter;
+    @GuardedBy("queuedTxOperations")
+    private TransactionContext deferredTransactionContext;
+    @GuardedBy("queuedTxOperations")
+    private boolean pendingEnqueue;
 
     TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
             final String shardName) {
@@ -67,35 +71,77 @@ class TransactionContextWrapper {
     }
 
     /**
-     * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
+     * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
+     * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
+     * context is not available.
      */
     private void enqueueTransactionOperation(final TransactionOperation operation) {
-        final boolean invokeOperation;
+        // We have three things to do here:
+        // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
+        // - acquire a permit for the operation if we still need to enqueue it
+        // - enqueue the operation
+        //
+        // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
+        // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
+        // complications are:
+        // - this method may be called from the thread invoking executePriorTransactionOperations()
+        // - user may be violating API contract of using the transaction from a single thread
+
+        // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
+        // the lock, we will assert that we will be enqueing another operation.
+        final TransactionContext contextOnEntry;
         synchronized (queuedTxOperations) {
-            if (transactionContext == null) {
-                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
-
-                queuedTxOperations.add(operation);
-                invokeOperation = false;
-            }  else {
-                invokeOperation = true;
+            contextOnEntry = transactionContext;
+            if (contextOnEntry == null) {
+                Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
+                        identifier);
+                pendingEnqueue = true;
             }
         }
 
-        if (invokeOperation) {
-            operation.invoke(transactionContext);
-        } else {
-            if (!limiter.acquire()) {
+        // Short-circuit if there is a context
+        if (contextOnEntry != null) {
+            operation.invoke(transactionContext, null);
+            return;
+        }
+
+        boolean cleanupEnqueue = true;
+        TransactionContext finishHandoff = null;
+        try {
+            // Acquire the permit,
+            final boolean havePermit = limiter.acquire();
+            if (!havePermit) {
                 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
                     shardName);
             }
+
+            // Ready to enqueue, take the lock again and append the operation
+            synchronized (queuedTxOperations) {
+                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+                queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
+                pendingEnqueue = false;
+                cleanupEnqueue = false;
+                finishHandoff = deferredTransactionContext;
+                deferredTransactionContext = null;
+            }
+        } finally {
+            if (cleanupEnqueue) {
+                synchronized (queuedTxOperations) {
+                    pendingEnqueue = false;
+                    finishHandoff = deferredTransactionContext;
+                    deferredTransactionContext = null;
+                }
+            }
+            if (finishHandoff != null) {
+                executePriorTransactionOperations(finishHandoff);
+            }
         }
     }
 
     void maybeExecuteTransactionOperation(final TransactionOperation op) {
-
-        if (transactionContext != null) {
-            op.invoke(transactionContext);
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            op.invoke(localContext, null);
         } else {
             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
             // callback to be executed after the Tx is created.
@@ -114,17 +160,23 @@ class TransactionContextWrapper {
             // in case a TransactionOperation results in another transaction operation being
             // queued (eg a put operation from a client read Future callback that is notified
             // synchronously).
-            final Collection<TransactionOperation> operationsBatch;
+            final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
             synchronized (queuedTxOperations) {
                 if (queuedTxOperations.isEmpty()) {
-                    // We're done invoking the TransactionOperations so we can now publish the
-                    // TransactionContext.
-                    localTransactionContext.operationHandOffComplete();
-                    if (!localTransactionContext.usesOperationLimiting()) {
-                        limiter.releaseAll();
+                    if (!pendingEnqueue) {
+                        // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
+                        localTransactionContext.operationHandOffComplete();
+                        if (!localTransactionContext.usesOperationLimiting()) {
+                            limiter.releaseAll();
+                        }
+
+                        // This is null-to-non-null transition after which we are releasing the lock and not doing
+                        // any further processing.
+                        transactionContext = localTransactionContext;
+                    } else {
+                        deferredTransactionContext = localTransactionContext;
                     }
-                    transactionContext = localTransactionContext;
-                    break;
+                    return;
                 }
 
                 operationsBatch = new ArrayList<>(queuedTxOperations);
@@ -134,32 +186,31 @@ class TransactionContextWrapper {
             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
             // A slight down-side is that we need to re-acquire the lock below but this should
             // be negligible.
-            for (TransactionOperation oper : operationsBatch) {
-                oper.invoke(localTransactionContext);
+            for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
+                oper.getKey().invoke(localTransactionContext, oper.getValue());
             }
         }
     }
 
     Future<ActorSelection> readyTransaction() {
         // avoid the creation of a promise and a TransactionOperation
-        if (transactionContext != null) {
-            return transactionContext.readyTransaction();
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            return localContext.readyTransaction(null);
         }
 
         final Promise<ActorSelection> promise = Futures.promise();
         enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public void invoke(TransactionContext newTransactionContext) {
-                promise.completeWith(newTransactionContext.readyTransaction());
+            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+                promise.completeWith(newTransactionContext.readyTransaction(havePermit));
             }
         });
 
         return promise.future();
     }
 
-    public OperationLimiter getLimiter() {
+    OperationLimiter getLimiter() {
         return limiter;
     }
-
-
 }