Release permits as transactions are replayed
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
index 26d7ff8b02bf7beef1e8637fd513af26318777b4..d11ee3e0bd318d6f6cf9476f5694e2626c4b4ba1 100644 (file)
@@ -7,14 +7,22 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.Lists;
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
 
 /**
  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
@@ -30,17 +38,28 @@ class TransactionContextWrapper {
      * The list of transaction operations to execute once the TransactionContext becomes available.
      */
     @GuardedBy("queuedTxOperations")
-    private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+    private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
+    private final TransactionIdentifier identifier;
+    private final OperationLimiter limiter;
+    private final String shardName;
 
     /**
      * The resulting TransactionContext.
      */
     private volatile TransactionContext transactionContext;
-
-    private final TransactionIdentifier identifier;
-
-    TransactionContextWrapper(final TransactionIdentifier identifier) {
-        this.identifier = identifier;
+    @GuardedBy("queuedTxOperations")
+    private TransactionContext deferredTransactionContext;
+    @GuardedBy("queuedTxOperations")
+    private boolean pendingEnqueue;
+
+    TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
+            final String shardName) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.limiter = new OperationLimiter(identifier,
+                // 1 extra permit for the ready operation
+                actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
+                TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
+        this.shardName = Preconditions.checkNotNull(shardName);
     }
 
     TransactionContext getTransactionContext() {
@@ -52,30 +71,77 @@ class TransactionContextWrapper {
     }
 
     /**
-     * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
+     * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
+     * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
+     * context is not available.
      */
     private void enqueueTransactionOperation(final TransactionOperation operation) {
-        final boolean invokeOperation;
+        // We have three things to do here:
+        // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
+        // - acquire a permit for the operation if we still need to enqueue it
+        // - enqueue the operation
+        //
+        // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
+        // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
+        // complications are:
+        // - this method may be called from the thread invoking executePriorTransactionOperations()
+        // - user may be violating API contract of using the transaction from a single thread
+
+        // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
+        // the lock, we will assert that we will be enqueing another operation.
+        final TransactionContext contextOnEntry;
         synchronized (queuedTxOperations) {
-            if (transactionContext == null) {
-                LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
-
-                queuedTxOperations.add(operation);
-                invokeOperation = false;
-            }  else {
-                invokeOperation = true;
+            contextOnEntry = transactionContext;
+            if (contextOnEntry == null) {
+                Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
+                        identifier);
+                pendingEnqueue = true;
             }
         }
 
-        if (invokeOperation) {
-            operation.invoke(transactionContext);
+        // Short-circuit if there is a context
+        if (contextOnEntry != null) {
+            operation.invoke(transactionContext, null);
+            return;
+        }
+
+        boolean cleanupEnqueue = true;
+        TransactionContext finishHandoff = null;
+        try {
+            // Acquire the permit,
+            final boolean havePermit = limiter.acquire();
+            if (!havePermit) {
+                LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
+                    shardName);
+            }
+
+            // Ready to enqueue, take the lock again and append the operation
+            synchronized (queuedTxOperations) {
+                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+                queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
+                pendingEnqueue = false;
+                cleanupEnqueue = false;
+                finishHandoff = deferredTransactionContext;
+                deferredTransactionContext = null;
+            }
+        } finally {
+            if (cleanupEnqueue) {
+                synchronized (queuedTxOperations) {
+                    pendingEnqueue = false;
+                    finishHandoff = deferredTransactionContext;
+                    deferredTransactionContext = null;
+                }
+            }
+            if (finishHandoff != null) {
+                executePriorTransactionOperations(finishHandoff);
+            }
         }
     }
 
     void maybeExecuteTransactionOperation(final TransactionOperation op) {
-
-        if (transactionContext != null) {
-            op.invoke(transactionContext);
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            op.invoke(localContext, null);
         } else {
             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
             // callback to be executed after the Tx is created.
@@ -84,7 +150,7 @@ class TransactionContextWrapper {
     }
 
     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
-        while(true) {
+        while (true) {
             // Access to queuedTxOperations and transactionContext must be protected and atomic
             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
             // issues and ensure no TransactionOperation is missed and that they are processed
@@ -94,25 +160,59 @@ class TransactionContextWrapper {
             // in case a TransactionOperation results in another transaction operation being
             // queued (eg a put operation from a client read Future callback that is notified
             // synchronously).
-            Collection<TransactionOperation> operationsBatch = null;
-            synchronized(queuedTxOperations) {
-                if(queuedTxOperations.isEmpty()) {
-                    // We're done invoking the TransactionOperations so we can now publish the
-                    // TransactionContext.
-                    transactionContext = localTransactionContext;
-                    break;
+            final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
+            synchronized (queuedTxOperations) {
+                if (queuedTxOperations.isEmpty()) {
+                    if (!pendingEnqueue) {
+                        // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
+                        localTransactionContext.operationHandOffComplete();
+
+                        // This is null-to-non-null transition after which we are releasing the lock and not doing
+                        // any further processing.
+                        transactionContext = localTransactionContext;
+                    } else {
+                        deferredTransactionContext = localTransactionContext;
+                    }
+                    return;
                 }
 
                 operationsBatch = new ArrayList<>(queuedTxOperations);
                 queuedTxOperations.clear();
             }
 
-            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
-            // A slight down-side is that we need to re-acquire the lock below but this should
-            // be negligible.
-            for(TransactionOperation oper: operationsBatch) {
-                oper.invoke(localTransactionContext);
+            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
+            // that we need to re-acquire the lock below but this should be negligible.
+            for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
+                final Boolean permit = oper.getValue();
+                if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
+                    // If the context is not using limiting we need to release operations as we are queueing them, so
+                    // user threads are not charged for them.
+                    limiter.release();
+                }
+                oper.getKey().invoke(localTransactionContext, permit);
             }
         }
     }
+
+    Future<ActorSelection> readyTransaction() {
+        // avoid the creation of a promise and a TransactionOperation
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            return localContext.readyTransaction(null);
+        }
+
+        final Promise<ActorSelection> promise = Futures.promise();
+        enqueueTransactionOperation(new TransactionOperation() {
+            @Override
+            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+                promise.completeWith(newTransactionContext.readyTransaction(havePermit));
+            }
+        });
+
+        return promise.future();
+    }
+
+    OperationLimiter getLimiter() {
+        return limiter;
+    }
 }