BUG-5280: switch transactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
index 137f6529c710f37d5e8c57beaa7cd8892336307d..ef9ee68bf016df46e5dfcd1711d42adcc20f4664 100644 (file)
@@ -7,14 +7,21 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
 
 /**
  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
@@ -32,15 +39,20 @@ class TransactionContextWrapper {
     @GuardedBy("queuedTxOperations")
     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
 
+    private final TransactionIdentifier identifier;
+
     /**
      * The resulting TransactionContext.
      */
     private volatile TransactionContext transactionContext;
 
-    private final TransactionIdentifier identifier;
+    private final OperationLimiter limiter;
 
-    TransactionContextWrapper(TransactionIdentifier identifier) {
-        this.identifier = identifier;
+    TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.limiter = new OperationLimiter(identifier,
+                actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
+                TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
     }
 
     TransactionContext getTransactionContext() {
@@ -54,19 +66,23 @@ class TransactionContextWrapper {
     /**
      * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
      */
-    private void enqueueTransactionOperation(TransactionOperation operation) {
-        boolean invokeOperation = true;
-        synchronized(queuedTxOperations) {
-            if(transactionContext == null) {
+    private void enqueueTransactionOperation(final TransactionOperation operation) {
+        final boolean invokeOperation;
+        synchronized (queuedTxOperations) {
+            if (transactionContext == null) {
                 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
 
-                invokeOperation = false;
                 queuedTxOperations.add(operation);
+                invokeOperation = false;
+            }  else {
+                invokeOperation = true;
             }
         }
 
-        if(invokeOperation) {
+        if (invokeOperation) {
             operation.invoke(transactionContext);
+        } else {
+            limiter.acquire();
         }
     }
 
@@ -81,7 +97,7 @@ class TransactionContextWrapper {
         }
     }
 
-    void executePriorTransactionOperations(TransactionContext localTransactionContext) {
+    void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
         while(true) {
             // Access to queuedTxOperations and transactionContext must be protected and atomic
             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
@@ -93,10 +109,14 @@ class TransactionContextWrapper {
             // queued (eg a put operation from a client read Future callback that is notified
             // synchronously).
             Collection<TransactionOperation> operationsBatch = null;
-            synchronized(queuedTxOperations) {
-                if(queuedTxOperations.isEmpty()) {
+            synchronized (queuedTxOperations) {
+                if (queuedTxOperations.isEmpty()) {
                     // We're done invoking the TransactionOperations so we can now publish the
                     // TransactionContext.
+                    localTransactionContext.operationHandOffComplete();
+                    if(!localTransactionContext.usesOperationLimiting()){
+                        limiter.releaseAll();
+                    }
                     transactionContext = localTransactionContext;
                     break;
                 }
@@ -108,9 +128,32 @@ class TransactionContextWrapper {
             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
             // A slight down-side is that we need to re-acquire the lock below but this should
             // be negligible.
-            for(TransactionOperation oper: operationsBatch) {
+            for (TransactionOperation oper : operationsBatch) {
                 oper.invoke(localTransactionContext);
             }
         }
     }
+
+    Future<ActorSelection> readyTransaction() {
+        // avoid the creation of a promise and a TransactionOperation
+        if (transactionContext != null) {
+            return transactionContext.readyTransaction();
+        }
+
+        final Promise<ActorSelection> promise = Futures.promise();
+        enqueueTransactionOperation(new TransactionOperation() {
+            @Override
+            public void invoke(TransactionContext transactionContext) {
+                promise.completeWith(transactionContext.readyTransaction());
+            }
+        });
+
+        return promise.future();
+    }
+
+    public OperationLimiter getLimiter() {
+        return limiter;
+    }
+
+
 }