Fix TransactionContextWrapper limiter accounting 39/70439/2
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 5 Apr 2018 14:49:06 +0000 (16:49 +0200)
committerRobert Varga <nite@hq.sk>
Sat, 7 Apr 2018 00:57:45 +0000 (00:57 +0000)
The fix for CONTROLLER-1814 wrecked the fragile OperationLimiter
accounting between TransactionContextWrapper and
RemoteTransactionContext. The problem is that during initial
connect time TransactionContextWrapper acquires limiter operations
and the state of the OperationLimiter is inherited by
RemoteTransactionContext. This though means that we need to know
which operation actually acquired a permit and which did not.

As it turns out, this needs to make TransactionContext methods take
an additional hint as to whether a limit attempt was made and what
was the result of it.

Furthermore the internal TransactionContextWrapper logic needs to
be changed from 'enqueue and wait' to 'wait and enqueue' strategy,
so when an operation is added to the queue and potentially picked
up by executePriorTransactionOperations() we already know whether
a permit was acquired or not.

JIRA: CONTROLLER-1823
Change-Id: I78d43a1abde8c6da6e3da2f56823bba130499133
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit b69500a51978c3d3ef639345a1a97a58cc3f6bb8)
(cherry picked from commit dc295d9be77748d7e695d003a02d299d493abc8d)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java

index 9fad1f1..8c4dddc 100644 (file)
@@ -45,7 +45,7 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void executeModification(final AbstractModification modification) {
+    public void executeModification(final AbstractModification modification, final Boolean havePermit) {
         incrementModificationCount();
         if (operationError == null) {
             try {
@@ -57,7 +57,8 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture) {
+    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+            final Boolean havePermit) {
         Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
             @Override
             public void onSuccess(final T result) {
@@ -77,13 +78,13 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction() {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
         final LocalThreePhaseCommitCohort cohort = ready();
         return cohort.initiateCoordinatedCommit();
     }
 
     @Override
-    public Future<Object> directCommit() {
+    public Future<Object> directCommit(final Boolean havePermit) {
         final LocalThreePhaseCommitCohort cohort = ready();
         return cohort.initiateDirectCommit();
     }
index b9c2e13..d14a936 100644 (file)
@@ -24,7 +24,7 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
 
     private final Throwable failure;
 
-    NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+    NoOpTransactionContext(final Throwable failure, final TransactionIdentifier identifier) {
         super(identifier);
         this.failure = failure;
     }
@@ -35,25 +35,26 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public Future<Object> directCommit() {
+    public Future<Object> directCommit(final Boolean havePermit) {
         LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction() {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
         LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
-    public void executeModification(AbstractModification modification) {
+    public void executeModification(final AbstractModification modification, final Boolean havePermit) {
         LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
                 modification.getClass().getSimpleName(), modification.getPath());
     }
 
     @Override
-    public <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture) {
+    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+            final Boolean havePermit) {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
 
index 941c861..27969b3 100644 (file)
@@ -50,8 +50,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
      */
     private volatile Throwable failedModification;
 
-    protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
-            ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
+    protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
+            final ActorContext actorContext, final short remoteTransactionVersion, final OperationLimiter limiter) {
         super(identifier, remoteTransactionVersion);
         this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
@@ -75,27 +75,34 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public Future<Object> directCommit() {
+    public Future<Object> directCommit(final Boolean havePermit) {
         LOG.debug("Tx {} directCommit called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
-
+        bumpPermits(havePermit);
         return sendBatchedModifications(true, true);
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction() {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
         logModificationCount();
 
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
 
+        bumpPermits(havePermit);
         Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
 
         return transformReadyReply(lastModificationsFuture);
     }
 
+    private void bumpPermits(final Boolean havePermit) {
+        if (Boolean.TRUE.equals(havePermit)) {
+            ++batchPermits;
+        }
+    }
+
     protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
         // Transform the last reply Future into a Future that returns the cohort actor path from
         // the last reply message. That's the end result of the ready operation.
@@ -107,7 +114,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return new BatchedModifications(getIdentifier(), getTransactionVersion());
     }
 
-    private void batchModification(Modification modification, boolean havePermit) {
+    private void batchModification(final Modification modification, final boolean havePermit) {
         incrementModificationCount();
         if (havePermit) {
             ++batchPermits;
@@ -129,7 +136,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return sendBatchedModifications(false, false);
     }
 
-    protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
+    protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) {
         Future<Object> sent = null;
         if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
             if (batchedModifications == null) {
@@ -167,7 +174,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
                 actorContext.getTransactionCommitOperationTimeout());
             sent.onComplete(new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable failure, Object success) {
+                public void onComplete(final Throwable failure, final Object success) {
                     if (failure != null) {
                         LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
                         failedModification = failure;
@@ -183,16 +190,23 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void executeModification(AbstractModification modification) {
+    public void executeModification(final AbstractModification modification, final Boolean havePermit) {
         LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
                 modification.getClass().getSimpleName(), modification.getPath());
 
-        final boolean havePermit = failedModification == null && acquireOperation();
-        batchModification(modification, havePermit);
+        final boolean permitToRelease;
+        if (havePermit == null) {
+            permitToRelease = failedModification == null && acquireOperation();
+        } else {
+            permitToRelease = havePermit.booleanValue();
+        }
+
+        batchModification(modification, permitToRelease);
     }
 
     @Override
-    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture) {
+    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
+            final Boolean havePermit) {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
 
@@ -209,14 +223,14 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        final boolean havePermit = acquireOperation();
+        final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 // We have previously acquired an operation, now release it, no matter what happened
-                if (havePermit) {
+                if (permitToRelease) {
                     limiter.release();
                 }
 
@@ -245,15 +259,15 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
      * @return True if a permit was successfully acquired, false otherwise
      */
     private boolean acquireOperation() {
-        if (isOperationHandOffComplete()) {
-            if (limiter.acquire()) {
-                return true;
-            }
+        Preconditions.checkState(isOperationHandOffComplete(),
+            "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff",
+            getIdentifier(), actor);
 
-            LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(),
-                actor);
+        if (limiter.acquire()) {
+            return true;
         }
 
+        LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor);
         return false;
     }
 
index b62b056..543834c 100644 (file)
@@ -20,13 +20,13 @@ import scala.concurrent.Future;
 interface TransactionContext {
     void closeTransaction();
 
-    Future<ActorSelection> readyTransaction();
+    Future<ActorSelection> readyTransaction(Boolean havePermit);
 
-    void executeModification(AbstractModification modification);
+    void executeModification(AbstractModification modification, Boolean havePermit);
 
-    <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise);
+    <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise, Boolean havePermit);
 
-    Future<Object> directCommit();
+    Future<Object> directCommit(Boolean havePermit);
 
     /**
      * Invoked by {@link TransactionContextWrapper} when it has finished handing
index 865cc60..d4b52e5 100644 (file)
@@ -42,7 +42,7 @@ final class TransactionContextCleanup extends FinalizablePhantomReference<Transa
 
     private final TransactionContext cleanup;
 
-    private TransactionContextCleanup(TransactionProxy referent, TransactionContext cleanup) {
+    private TransactionContextCleanup(final TransactionProxy referent, final TransactionContext cleanup) {
         super(referent, QUEUE);
         this.cleanup = cleanup;
     }
index a126ce9..0e12609 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -37,16 +38,19 @@ class TransactionContextWrapper {
      * The list of transaction operations to execute once the TransactionContext becomes available.
      */
     @GuardedBy("queuedTxOperations")
-    private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+    private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
     private final TransactionIdentifier identifier;
+    private final OperationLimiter limiter;
     private final String shardName;
 
     /**
      * The resulting TransactionContext.
      */
     private volatile TransactionContext transactionContext;
-
-    private final OperationLimiter limiter;
+    @GuardedBy("queuedTxOperations")
+    private TransactionContext deferredTransactionContext;
+    @GuardedBy("queuedTxOperations")
+    private boolean pendingEnqueue;
 
     TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
             final String shardName) {
@@ -67,35 +71,77 @@ class TransactionContextWrapper {
     }
 
     /**
-     * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
+     * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
+     * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
+     * context is not available.
      */
     private void enqueueTransactionOperation(final TransactionOperation operation) {
-        final boolean invokeOperation;
+        // We have three things to do here:
+        // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
+        // - acquire a permit for the operation if we still need to enqueue it
+        // - enqueue the operation
+        //
+        // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
+        // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
+        // complications are:
+        // - this method may be called from the thread invoking executePriorTransactionOperations()
+        // - user may be violating API contract of using the transaction from a single thread
+
+        // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
+        // the lock, we will assert that we will be enqueing another operation.
+        final TransactionContext contextOnEntry;
         synchronized (queuedTxOperations) {
-            if (transactionContext == null) {
-                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
-
-                queuedTxOperations.add(operation);
-                invokeOperation = false;
-            }  else {
-                invokeOperation = true;
+            contextOnEntry = transactionContext;
+            if (contextOnEntry == null) {
+                Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
+                        identifier);
+                pendingEnqueue = true;
             }
         }
 
-        if (invokeOperation) {
-            operation.invoke(transactionContext);
-        } else {
-            if (!limiter.acquire()) {
+        // Short-circuit if there is a context
+        if (contextOnEntry != null) {
+            operation.invoke(transactionContext, null);
+            return;
+        }
+
+        boolean cleanupEnqueue = true;
+        TransactionContext finishHandoff = null;
+        try {
+            // Acquire the permit,
+            final boolean havePermit = limiter.acquire();
+            if (!havePermit) {
                 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
                     shardName);
             }
+
+            // Ready to enqueue, take the lock again and append the operation
+            synchronized (queuedTxOperations) {
+                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+                queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
+                pendingEnqueue = false;
+                cleanupEnqueue = false;
+                finishHandoff = deferredTransactionContext;
+                deferredTransactionContext = null;
+            }
+        } finally {
+            if (cleanupEnqueue) {
+                synchronized (queuedTxOperations) {
+                    pendingEnqueue = false;
+                    finishHandoff = deferredTransactionContext;
+                    deferredTransactionContext = null;
+                }
+            }
+            if (finishHandoff != null) {
+                executePriorTransactionOperations(finishHandoff);
+            }
         }
     }
 
     void maybeExecuteTransactionOperation(final TransactionOperation op) {
-
-        if (transactionContext != null) {
-            op.invoke(transactionContext);
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            op.invoke(localContext, null);
         } else {
             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
             // callback to be executed after the Tx is created.
@@ -114,17 +160,23 @@ class TransactionContextWrapper {
             // in case a TransactionOperation results in another transaction operation being
             // queued (eg a put operation from a client read Future callback that is notified
             // synchronously).
-            final Collection<TransactionOperation> operationsBatch;
+            final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
             synchronized (queuedTxOperations) {
                 if (queuedTxOperations.isEmpty()) {
-                    // We're done invoking the TransactionOperations so we can now publish the
-                    // TransactionContext.
-                    localTransactionContext.operationHandOffComplete();
-                    if (!localTransactionContext.usesOperationLimiting()) {
-                        limiter.releaseAll();
+                    if (!pendingEnqueue) {
+                        // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
+                        localTransactionContext.operationHandOffComplete();
+                        if (!localTransactionContext.usesOperationLimiting()) {
+                            limiter.releaseAll();
+                        }
+
+                        // This is null-to-non-null transition after which we are releasing the lock and not doing
+                        // any further processing.
+                        transactionContext = localTransactionContext;
+                    } else {
+                        deferredTransactionContext = localTransactionContext;
                     }
-                    transactionContext = localTransactionContext;
-                    break;
+                    return;
                 }
 
                 operationsBatch = new ArrayList<>(queuedTxOperations);
@@ -134,32 +186,31 @@ class TransactionContextWrapper {
             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
             // A slight down-side is that we need to re-acquire the lock below but this should
             // be negligible.
-            for (TransactionOperation oper : operationsBatch) {
-                oper.invoke(localTransactionContext);
+            for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
+                oper.getKey().invoke(localTransactionContext, oper.getValue());
             }
         }
     }
 
     Future<ActorSelection> readyTransaction() {
         // avoid the creation of a promise and a TransactionOperation
-        if (transactionContext != null) {
-            return transactionContext.readyTransaction();
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            return localContext.readyTransaction(null);
         }
 
         final Promise<ActorSelection> promise = Futures.promise();
         enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public void invoke(TransactionContext newTransactionContext) {
-                promise.completeWith(newTransactionContext.readyTransaction());
+            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+                promise.completeWith(newTransactionContext.readyTransaction(havePermit));
             }
         });
 
         return promise.future();
     }
 
-    public OperationLimiter getLimiter() {
+    OperationLimiter getLimiter() {
         return limiter;
     }
-
-
 }
index 3defd2c..962d261 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import org.eclipse.jdt.annotation.Nullable;
+
 /**
  * Abstract superclass for transaction operations which should be executed
  * on a {@link TransactionContext} at a later point in time.
@@ -16,6 +18,8 @@ abstract class TransactionOperation {
      * Execute the delayed operation.
      *
      * @param transactionContext the TransactionContext
+     * @param havePermit Boolean indicator if this operation has tried and acquired a permit, null if there was no
+     *                   attempt to acquire a permit.
      */
-    protected abstract void invoke(TransactionContext transactionContext);
+    protected abstract void invoke(TransactionContext transactionContext, @Nullable Boolean havePermit);
 }
index 7bb62aa..aca4e41 100644 (file)
@@ -92,8 +92,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
-            public void invoke(final TransactionContext transactionContext) {
-                transactionContext.executeRead(readCmd, proxyFuture);
+            public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+                transactionContext.executeRead(readCmd, proxyFuture, havePermit);
             }
         });
 
@@ -169,8 +169,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
-            protected void invoke(final TransactionContext transactionContext) {
-                transactionContext.executeModification(modification);
+            protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+                transactionContext.executeModification(modification, havePermit);
             }
         });
     }
@@ -203,7 +203,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) {
             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
                 @Override
-                public void invoke(final TransactionContext transactionContext) {
+                public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
                     transactionContext.closeTransaction();
                 }
             });
@@ -257,14 +257,15 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             final Promise promise = akka.dispatch.Futures.promise();
             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
                 @Override
-                public void invoke(final TransactionContext newTransactionContext) {
-                    promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef));
+                public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+                    promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef,
+                        havePermit));
                 }
             });
             future = promise.future();
         } else {
             // avoid the creation of a promise and a TransactionOperation
-            future = getDirectCommitFuture(transactionContext, operationCallbackRef);
+            future = getDirectCommitFuture(transactionContext, operationCallbackRef, null);
         }
 
         return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier(),
@@ -272,12 +273,12 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
-            final OperationCallback.Reference operationCallbackRef) {
+            final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) {
         TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
                 txContextFactory.getActorContext());
         operationCallbackRef.set(rateLimitingCallback);
         rateLimitingCallback.run();
-        return transactionContext.directCommit();
+        return transactionContext.directCommit(havePermit);
     }
 
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
index 016b4b7..e63501d 100644 (file)
@@ -68,7 +68,8 @@ public class LocalTransactionContextTest {
     public void testWrite() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
-        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+            null);
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
     }
 
@@ -76,14 +77,15 @@ public class LocalTransactionContextTest {
     public void testMerge() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
-        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+            null);
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
     }
 
     @Test
     public void testDelete() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
-        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
         verify(readWriteTransaction).delete(yangInstanceIdentifier);
     }
 
@@ -95,7 +97,7 @@ public class LocalTransactionContextTest {
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction)
             .read(yangInstanceIdentifier);
         localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
-                SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+                SettableFuture.create(), null);
         verify(readWriteTransaction).read(yangInstanceIdentifier);
     }
 
@@ -104,7 +106,7 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
         localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
-                SettableFuture.<Boolean>create());
+                SettableFuture.create(), null);
         verify(readWriteTransaction).exists(yangInstanceIdentifier);
     }
 
@@ -114,7 +116,7 @@ public class LocalTransactionContextTest {
         doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
         doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
 
-        Future<ActorSelection> future = localTransactionContext.readyTransaction();
+        Future<ActorSelection> future = localTransactionContext.readyTransaction(null);
         assertTrue(future.isCompleted());
 
         verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
@@ -127,8 +129,10 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
 
-        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
-        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+            null);
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+            null);
 
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
 
@@ -142,8 +146,10 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
 
-        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
-        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+            null);
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+            null);
 
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
 
@@ -156,19 +162,19 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
 
-        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
-        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
 
         verify(readWriteTransaction).delete(yangInstanceIdentifier);
 
         doReadyWithExpectedError(error);
     }
 
-    private void doReadyWithExpectedError(RuntimeException expError) {
+    private void doReadyWithExpectedError(final RuntimeException expError) {
         LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
         doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
         doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
 
-        localTransactionContext.readyTransaction();
+        localTransactionContext.readyTransaction(null);
     }
 }
index 526ad77..3f9b813 100644 (file)
@@ -72,8 +72,8 @@ public class RemoteTransactionContextTest extends AbstractActorTest {
      */
     @Test
     public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
-        txContext.executeModification(DELETE);
-        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE, null);
+        txContext.executeModification(DELETE, null);
         assertEquals(2, limiter.availablePermits());
 
         Future<Object> future = txContext.sendBatchedModifications();
@@ -90,12 +90,12 @@ public class RemoteTransactionContextTest extends AbstractActorTest {
                 assertEquals(4, limiter.availablePermits());
 
                 // The transaction has failed, no throttling should occur
-                txContext.executeModification(DELETE);
+                txContext.executeModification(DELETE, null);
                 assertEquals(4, limiter.availablePermits());
 
                 // Executing a read should result in immediate failure
                 final SettableFuture<Boolean> readFuture = SettableFuture.create();
-                txContext.executeRead(new DataExists(), readFuture);
+                txContext.executeRead(new DataExists(), readFuture, null);
                 assertTrue(readFuture.isDone());
                 try {
                     readFuture.get();
@@ -106,7 +106,7 @@ public class RemoteTransactionContextTest extends AbstractActorTest {
             }
         });
 
-        future = txContext.directCommit();
+        future = txContext.directCommit(null);
 
         msg = kit.expectMsgClass(BatchedModifications.class);
         // Modification should have been thrown away by the dropped transmit induced by executeRead()
@@ -131,12 +131,12 @@ public class RemoteTransactionContextTest extends AbstractActorTest {
      */
     @Test
     public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
-        txContext.executeModification(DELETE);
-        txContext.executeModification(DELETE);
-        txContext.executeModification(DELETE);
-        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE, null);
+        txContext.executeModification(DELETE, null);
+        txContext.executeModification(DELETE, null);
+        txContext.executeModification(DELETE, null);
         assertEquals(0, limiter.availablePermits());
-        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE, null);
         // Last acquire should have failed ...
         assertEquals(0, limiter.availablePermits());
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.