Fix TransactionContextWrapper limiter accounting 82/70382/3
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 5 Apr 2018 14:49:06 +0000 (16:49 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 5 Apr 2018 18:51:14 +0000 (20:51 +0200)
The fix for CONTROLLER-1814 wrecked the fragile OperationLimiter
accounting between TransactionContextWrapper and
RemoteTransactionContext. The problem is that during initial
connect time TransactionContextWrapper acquires limiter operations
and the state of the OperationLimiter is inherited by
RemoteTransactionContext. This though means that we need to know
which operation actually acquired a permit and which did not.

As it turns out, this needs to make TransactionContext methods take
an additional hint as to whether a limit attempt was made and what
was the result of it.

Furthermore the internal TransactionContextWrapper logic needs to
be changed from 'enqueue and wait' to 'wait and enqueue' strategy,
so when an operation is added to the queue and potentially picked
up by executePriorTransactionOperations() we already know whether
a permit was acquired or not.

JIRA: CONTROLLER-1823
Change-Id: I78d43a1abde8c6da6e3da2f56823bba130499133
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java

index 9fad1f1016bd3e3e33af2566cc0defef2121cee9..8c4dddc565d27aab4943e3ef61d99ce7d166e0f7 100644 (file)
@@ -45,7 +45,7 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void executeModification(final AbstractModification modification) {
+    public void executeModification(final AbstractModification modification, final Boolean havePermit) {
         incrementModificationCount();
         if (operationError == null) {
             try {
@@ -57,7 +57,8 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture) {
+    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+            final Boolean havePermit) {
         Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
             @Override
             public void onSuccess(final T result) {
@@ -77,13 +78,13 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction() {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
         final LocalThreePhaseCommitCohort cohort = ready();
         return cohort.initiateCoordinatedCommit();
     }
 
     @Override
-    public Future<Object> directCommit() {
+    public Future<Object> directCommit(final Boolean havePermit) {
         final LocalThreePhaseCommitCohort cohort = ready();
         return cohort.initiateDirectCommit();
     }
index 009c9642e95942de7593b008bf7d78890af34e72..a6ba87c7ff5fdfcaa35bf35a408b90d04e28d346 100644 (file)
@@ -24,7 +24,7 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
 
     private final Throwable failure;
 
-    NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+    NoOpTransactionContext(final Throwable failure, final TransactionIdentifier identifier) {
         super(identifier);
         this.failure = failure;
     }
@@ -35,25 +35,26 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public Future<Object> directCommit() {
+    public Future<Object> directCommit(final Boolean havePermit) {
         LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction() {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
         LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
-    public void executeModification(AbstractModification modification) {
+    public void executeModification(final AbstractModification modification, final Boolean havePermit) {
         LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
                 modification.getClass().getSimpleName(), modification.getPath());
     }
 
     @Override
-    public <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture) {
+    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+            final Boolean havePermit) {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
 
index 941c86116cb7926ebd13a6b55b79b32484bdd54e..27969b3e8ef405331c1e69fb1b9f912612d22538 100644 (file)
@@ -50,8 +50,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
      */
     private volatile Throwable failedModification;
 
-    protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
-            ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
+    protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
+            final ActorContext actorContext, final short remoteTransactionVersion, final OperationLimiter limiter) {
         super(identifier, remoteTransactionVersion);
         this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
@@ -75,27 +75,34 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public Future<Object> directCommit() {
+    public Future<Object> directCommit(final Boolean havePermit) {
         LOG.debug("Tx {} directCommit called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
-
+        bumpPermits(havePermit);
         return sendBatchedModifications(true, true);
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction() {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
         logModificationCount();
 
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
 
+        bumpPermits(havePermit);
         Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
 
         return transformReadyReply(lastModificationsFuture);
     }
 
+    private void bumpPermits(final Boolean havePermit) {
+        if (Boolean.TRUE.equals(havePermit)) {
+            ++batchPermits;
+        }
+    }
+
     protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
         // Transform the last reply Future into a Future that returns the cohort actor path from
         // the last reply message. That's the end result of the ready operation.
@@ -107,7 +114,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return new BatchedModifications(getIdentifier(), getTransactionVersion());
     }
 
-    private void batchModification(Modification modification, boolean havePermit) {
+    private void batchModification(final Modification modification, final boolean havePermit) {
         incrementModificationCount();
         if (havePermit) {
             ++batchPermits;
@@ -129,7 +136,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return sendBatchedModifications(false, false);
     }
 
-    protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
+    protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) {
         Future<Object> sent = null;
         if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
             if (batchedModifications == null) {
@@ -167,7 +174,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
                 actorContext.getTransactionCommitOperationTimeout());
             sent.onComplete(new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable failure, Object success) {
+                public void onComplete(final Throwable failure, final Object success) {
                     if (failure != null) {
                         LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
                         failedModification = failure;
@@ -183,16 +190,23 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void executeModification(AbstractModification modification) {
+    public void executeModification(final AbstractModification modification, final Boolean havePermit) {
         LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
                 modification.getClass().getSimpleName(), modification.getPath());
 
-        final boolean havePermit = failedModification == null && acquireOperation();
-        batchModification(modification, havePermit);
+        final boolean permitToRelease;
+        if (havePermit == null) {
+            permitToRelease = failedModification == null && acquireOperation();
+        } else {
+            permitToRelease = havePermit.booleanValue();
+        }
+
+        batchModification(modification, permitToRelease);
     }
 
     @Override
-    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture) {
+    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
+            final Boolean havePermit) {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
 
@@ -209,14 +223,14 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        final boolean havePermit = acquireOperation();
+        final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 // We have previously acquired an operation, now release it, no matter what happened
-                if (havePermit) {
+                if (permitToRelease) {
                     limiter.release();
                 }
 
@@ -245,15 +259,15 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
      * @return True if a permit was successfully acquired, false otherwise
      */
     private boolean acquireOperation() {
-        if (isOperationHandOffComplete()) {
-            if (limiter.acquire()) {
-                return true;
-            }
+        Preconditions.checkState(isOperationHandOffComplete(),
+            "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff",
+            getIdentifier(), actor);
 
-            LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(),
-                actor);
+        if (limiter.acquire()) {
+            return true;
         }
 
+        LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor);
         return false;
     }
 
index b62b056d587b83d42a3613e2cf40db9c935d7c69..543834c2cbad2547e33352b8a43dd9e55b399912 100644 (file)
@@ -20,13 +20,13 @@ import scala.concurrent.Future;
 interface TransactionContext {
     void closeTransaction();
 
-    Future<ActorSelection> readyTransaction();
+    Future<ActorSelection> readyTransaction(Boolean havePermit);
 
-    void executeModification(AbstractModification modification);
+    void executeModification(AbstractModification modification, Boolean havePermit);
 
-    <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise);
+    <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise, Boolean havePermit);
 
-    Future<Object> directCommit();
+    Future<Object> directCommit(Boolean havePermit);
 
     /**
      * Invoked by {@link TransactionContextWrapper} when it has finished handing
index 865cc60edb2d0f982ecfee672baa63f347b170c9..d4b52e52524ca894d123693db11497241a48fc7b 100644 (file)
@@ -42,7 +42,7 @@ final class TransactionContextCleanup extends FinalizablePhantomReference<Transa
 
     private final TransactionContext cleanup;
 
-    private TransactionContextCleanup(TransactionProxy referent, TransactionContext cleanup) {
+    private TransactionContextCleanup(final TransactionProxy referent, final TransactionContext cleanup) {
         super(referent, QUEUE);
         this.cleanup = cleanup;
     }
index a126ce95971bae232c2da0b1f9fb9aa3c550cfe6..0e1260962d37d807f53233729f3f999e996faed9 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -37,16 +38,19 @@ class TransactionContextWrapper {
      * The list of transaction operations to execute once the TransactionContext becomes available.
      */
     @GuardedBy("queuedTxOperations")
-    private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+    private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
     private final TransactionIdentifier identifier;
+    private final OperationLimiter limiter;
     private final String shardName;
 
     /**
      * The resulting TransactionContext.
      */
     private volatile TransactionContext transactionContext;
-
-    private final OperationLimiter limiter;
+    @GuardedBy("queuedTxOperations")
+    private TransactionContext deferredTransactionContext;
+    @GuardedBy("queuedTxOperations")
+    private boolean pendingEnqueue;
 
     TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
             final String shardName) {
@@ -67,35 +71,77 @@ class TransactionContextWrapper {
     }
 
     /**
-     * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
+     * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
+     * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
+     * context is not available.
      */
     private void enqueueTransactionOperation(final TransactionOperation operation) {
-        final boolean invokeOperation;
+        // We have three things to do here:
+        // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
+        // - acquire a permit for the operation if we still need to enqueue it
+        // - enqueue the operation
+        //
+        // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
+        // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
+        // complications are:
+        // - this method may be called from the thread invoking executePriorTransactionOperations()
+        // - user may be violating API contract of using the transaction from a single thread
+
+        // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
+        // the lock, we will assert that we will be enqueing another operation.
+        final TransactionContext contextOnEntry;
         synchronized (queuedTxOperations) {
-            if (transactionContext == null) {
-                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
-
-                queuedTxOperations.add(operation);
-                invokeOperation = false;
-            }  else {
-                invokeOperation = true;
+            contextOnEntry = transactionContext;
+            if (contextOnEntry == null) {
+                Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
+                        identifier);
+                pendingEnqueue = true;
             }
         }
 
-        if (invokeOperation) {
-            operation.invoke(transactionContext);
-        } else {
-            if (!limiter.acquire()) {
+        // Short-circuit if there is a context
+        if (contextOnEntry != null) {
+            operation.invoke(transactionContext, null);
+            return;
+        }
+
+        boolean cleanupEnqueue = true;
+        TransactionContext finishHandoff = null;
+        try {
+            // Acquire the permit,
+            final boolean havePermit = limiter.acquire();
+            if (!havePermit) {
                 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
                     shardName);
             }
+
+            // Ready to enqueue, take the lock again and append the operation
+            synchronized (queuedTxOperations) {
+                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+                queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
+                pendingEnqueue = false;
+                cleanupEnqueue = false;
+                finishHandoff = deferredTransactionContext;
+                deferredTransactionContext = null;
+            }
+        } finally {
+            if (cleanupEnqueue) {
+                synchronized (queuedTxOperations) {
+                    pendingEnqueue = false;
+                    finishHandoff = deferredTransactionContext;
+                    deferredTransactionContext = null;
+                }
+            }
+            if (finishHandoff != null) {
+                executePriorTransactionOperations(finishHandoff);
+            }
         }
     }
 
     void maybeExecuteTransactionOperation(final TransactionOperation op) {
-
-        if (transactionContext != null) {
-            op.invoke(transactionContext);
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            op.invoke(localContext, null);
         } else {
             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
             // callback to be executed after the Tx is created.
@@ -114,17 +160,23 @@ class TransactionContextWrapper {
             // in case a TransactionOperation results in another transaction operation being
             // queued (eg a put operation from a client read Future callback that is notified
             // synchronously).
-            final Collection<TransactionOperation> operationsBatch;
+            final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
             synchronized (queuedTxOperations) {
                 if (queuedTxOperations.isEmpty()) {
-                    // We're done invoking the TransactionOperations so we can now publish the
-                    // TransactionContext.
-                    localTransactionContext.operationHandOffComplete();
-                    if (!localTransactionContext.usesOperationLimiting()) {
-                        limiter.releaseAll();
+                    if (!pendingEnqueue) {
+                        // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
+                        localTransactionContext.operationHandOffComplete();
+                        if (!localTransactionContext.usesOperationLimiting()) {
+                            limiter.releaseAll();
+                        }
+
+                        // This is null-to-non-null transition after which we are releasing the lock and not doing
+                        // any further processing.
+                        transactionContext = localTransactionContext;
+                    } else {
+                        deferredTransactionContext = localTransactionContext;
                     }
-                    transactionContext = localTransactionContext;
-                    break;
+                    return;
                 }
 
                 operationsBatch = new ArrayList<>(queuedTxOperations);
@@ -134,32 +186,31 @@ class TransactionContextWrapper {
             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
             // A slight down-side is that we need to re-acquire the lock below but this should
             // be negligible.
-            for (TransactionOperation oper : operationsBatch) {
-                oper.invoke(localTransactionContext);
+            for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
+                oper.getKey().invoke(localTransactionContext, oper.getValue());
             }
         }
     }
 
     Future<ActorSelection> readyTransaction() {
         // avoid the creation of a promise and a TransactionOperation
-        if (transactionContext != null) {
-            return transactionContext.readyTransaction();
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            return localContext.readyTransaction(null);
         }
 
         final Promise<ActorSelection> promise = Futures.promise();
         enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public void invoke(TransactionContext newTransactionContext) {
-                promise.completeWith(newTransactionContext.readyTransaction());
+            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+                promise.completeWith(newTransactionContext.readyTransaction(havePermit));
             }
         });
 
         return promise.future();
     }
 
-    public OperationLimiter getLimiter() {
+    OperationLimiter getLimiter() {
         return limiter;
     }
-
-
 }
index 3defd2cc50a98a1299ed79de1d2af9f8a7ccb5c7..962d26133b1b546b1953bb8e0cd7667593fda269 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import org.eclipse.jdt.annotation.Nullable;
+
 /**
  * Abstract superclass for transaction operations which should be executed
  * on a {@link TransactionContext} at a later point in time.
@@ -16,6 +18,8 @@ abstract class TransactionOperation {
      * Execute the delayed operation.
      *
      * @param transactionContext the TransactionContext
+     * @param havePermit Boolean indicator if this operation has tried and acquired a permit, null if there was no
+     *                   attempt to acquire a permit.
      */
-    protected abstract void invoke(TransactionContext transactionContext);
+    protected abstract void invoke(TransactionContext transactionContext, @Nullable Boolean havePermit);
 }
index f91da9ade330135fed0d681dc9458555e216ea24..68f5393c910bc4fc1d8d502e26b91e9da8103b86 100644 (file)
@@ -92,8 +92,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
-            public void invoke(final TransactionContext transactionContext) {
-                transactionContext.executeRead(readCmd, proxyFuture);
+            public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+                transactionContext.executeRead(readCmd, proxyFuture, havePermit);
             }
         });
 
@@ -169,8 +169,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
-            protected void invoke(final TransactionContext transactionContext) {
-                transactionContext.executeModification(modification);
+            protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+                transactionContext.executeModification(modification, havePermit);
             }
         });
     }
@@ -203,7 +203,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) {
             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
                 @Override
-                public void invoke(final TransactionContext transactionContext) {
+                public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
                     transactionContext.closeTransaction();
                 }
             });
@@ -257,14 +257,15 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             final Promise promise = akka.dispatch.Futures.promise();
             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
                 @Override
-                public void invoke(final TransactionContext newTransactionContext) {
-                    promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef));
+                public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+                    promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef,
+                        havePermit));
                 }
             });
             future = promise.future();
         } else {
             // avoid the creation of a promise and a TransactionOperation
-            future = getDirectCommitFuture(transactionContext, operationCallbackRef);
+            future = getDirectCommitFuture(transactionContext, operationCallbackRef, null);
         }
 
         return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier(),
@@ -272,12 +273,12 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
-            final OperationCallback.Reference operationCallbackRef) {
+            final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) {
         TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
                 txContextFactory.getActorContext());
         operationCallbackRef.set(rateLimitingCallback);
         rateLimitingCallback.run();
-        return transactionContext.directCommit();
+        return transactionContext.directCommit(havePermit);
     }
 
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
index 016b4b792789692a14799a5506fefea84ad4da56..e63501de6ca889012cb7a7d6cbad28aadc558c63 100644 (file)
@@ -68,7 +68,8 @@ public class LocalTransactionContextTest {
     public void testWrite() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
-        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+            null);
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
     }
 
@@ -76,14 +77,15 @@ public class LocalTransactionContextTest {
     public void testMerge() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
-        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+            null);
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
     }
 
     @Test
     public void testDelete() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
-        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
         verify(readWriteTransaction).delete(yangInstanceIdentifier);
     }
 
@@ -95,7 +97,7 @@ public class LocalTransactionContextTest {
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction)
             .read(yangInstanceIdentifier);
         localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
-                SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+                SettableFuture.create(), null);
         verify(readWriteTransaction).read(yangInstanceIdentifier);
     }
 
@@ -104,7 +106,7 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
         localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
-                SettableFuture.<Boolean>create());
+                SettableFuture.create(), null);
         verify(readWriteTransaction).exists(yangInstanceIdentifier);
     }
 
@@ -114,7 +116,7 @@ public class LocalTransactionContextTest {
         doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
         doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
 
-        Future<ActorSelection> future = localTransactionContext.readyTransaction();
+        Future<ActorSelection> future = localTransactionContext.readyTransaction(null);
         assertTrue(future.isCompleted());
 
         verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
@@ -127,8 +129,10 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
 
-        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
-        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+            null);
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+            null);
 
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
 
@@ -142,8 +146,10 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
 
-        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
-        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+            null);
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+            null);
 
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
 
@@ -156,19 +162,19 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
 
-        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
-        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
 
         verify(readWriteTransaction).delete(yangInstanceIdentifier);
 
         doReadyWithExpectedError(error);
     }
 
-    private void doReadyWithExpectedError(RuntimeException expError) {
+    private void doReadyWithExpectedError(final RuntimeException expError) {
         LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
         doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
         doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
 
-        localTransactionContext.readyTransaction();
+        localTransactionContext.readyTransaction(null);
     }
 }
index 8ae6a49b337182603b6836c135baa7fd456c24e4..f945208c426e65fb4e8d596c657a93d7f0d4fb1b 100644 (file)
@@ -72,8 +72,8 @@ public class RemoteTransactionContextTest extends AbstractActorTest {
      */
     @Test
     public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
-        txContext.executeModification(DELETE);
-        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE, null);
+        txContext.executeModification(DELETE, null);
         assertEquals(2, limiter.availablePermits());
 
         Future<Object> future = txContext.sendBatchedModifications();
@@ -90,12 +90,12 @@ public class RemoteTransactionContextTest extends AbstractActorTest {
                 assertEquals(4, limiter.availablePermits());
 
                 // The transaction has failed, no throttling should occur
-                txContext.executeModification(DELETE);
+                txContext.executeModification(DELETE, null);
                 assertEquals(4, limiter.availablePermits());
 
                 // Executing a read should result in immediate failure
                 final SettableFuture<Boolean> readFuture = SettableFuture.create();
-                txContext.executeRead(new DataExists(), readFuture);
+                txContext.executeRead(new DataExists(), readFuture, null);
                 assertTrue(readFuture.isDone());
                 try {
                     readFuture.get();
@@ -106,7 +106,7 @@ public class RemoteTransactionContextTest extends AbstractActorTest {
             }
         });
 
-        future = txContext.directCommit();
+        future = txContext.directCommit(null);
 
         msg = kit.expectMsgClass(BatchedModifications.class);
         // Modification should have been thrown away by the dropped transmit induced by executeRead()
@@ -131,12 +131,12 @@ public class RemoteTransactionContextTest extends AbstractActorTest {
      */
     @Test
     public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
-        txContext.executeModification(DELETE);
-        txContext.executeModification(DELETE);
-        txContext.executeModification(DELETE);
-        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE, null);
+        txContext.executeModification(DELETE, null);
+        txContext.executeModification(DELETE, null);
+        txContext.executeModification(DELETE, null);
         assertEquals(0, limiter.availablePermits());
-        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE, null);
         // Last acquire should have failed ...
         assertEquals(0, limiter.availablePermits());