BUG 3019 : Fix Operation throttling for modification batching scenarios 71/22371/2
authorMoiz Raja <moraja@cisco.com>
Wed, 10 Jun 2015 01:37:35 +0000 (18:37 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 11 Jun 2015 15:47:32 +0000 (15:47 +0000)
This patch straightens out where exactly limiting is done.

A TransactionProxy creates a TransactionContext for every shard on
which a transaction needs to be done. There are 3 types of TransactionContexts.
NoOpTransactionContext, LocalTransactionContext and RemoteTransactionContext.
When a operation is done on TransactionProxy it does not know which of these
TransactionContexts it should create so it first createas a TransactionContextWrapper.
All operations on TransactionProxy are then queued up in the TransactionContextWrapper
till we determine which TransactionContext to create. This patch creates an
OperationLimiter per TransactionContextWrapper. Everytime an operation is enqueued
we acquire a permit.

When the TransactionContext is finally created we do different things depending on
the TransactionContext. For NoOp and Local TransactionContexts we completely ignore
the limiter - that is for these TransactionContexts there is no limiting done. For
RemoteTransactionContext we do limiting. RemoteTransactionContext does not acquire
Operation permits till it is made visible by the TransactionContextWrapper - this is
signaled be the setting of the handOffComplete flag in AbstractTransactionContext.
After that RemoteTransactionContext takes over the business of acquiring permits.
OperationLimiter which also serves as the Operation completion handler is the only
component that releases the permits.

Another thing which this patch addresses is which configuration option we use for
operation limiting. We now use ShardBatchedModificationCount instead of the mailbox
limit from akka.conf. This removes the possibility of mis-configuration where
making ShardedBatchedModificationCount higher than mailbox limit could cause
unexpected blocking.

Change-Id: I571ba5278630e5166be6bcb3ff8e1c527c5e3343
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 21ccb7510c28e824d6441d48604aec7467d44710)

20 files changed:
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index cfbf945..e273762 100644 (file)
@@ -51,7 +51,7 @@ operational.persistent=false
 # The number of transaction modification operations (put, merge, delete) to batch before sending to the 
 # shard transaction actor. Batching improves performance as less modifications messages are sent to the 
 # actor and thus lessens the chance that the transaction actor's mailbox queue could get full.
-#shard-batched-modification-count=100
+#shard-batched-modification-count=1000
 
 # The maximum amount of time for akka operations (remote or local) to complete before failing.
 #operation-timeout-in-seconds=5
index 571899b..97a0205 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Preconditions;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.slf4j.Logger;
@@ -15,12 +14,12 @@ import org.slf4j.LoggerFactory;
 
 abstract class AbstractTransactionContext implements TransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
-    private final OperationLimiter limiter;
+    private final TransactionIdentifier transactionIdentifier;
     private long modificationCount = 0;
-    private boolean handoffComplete;
+    private boolean handOffComplete;
 
-    protected AbstractTransactionContext(final OperationLimiter limiter) {
-        this.limiter = Preconditions.checkNotNull(limiter);
+    protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) {
+        this.transactionIdentifier = transactionIdentifier;
     }
 
     /**
@@ -29,44 +28,7 @@ abstract class AbstractTransactionContext implements TransactionContext {
      * @return Transaction identifier.
      */
     @Nonnull protected final TransactionIdentifier getIdentifier() {
-        return limiter.getIdentifier();
-    }
-
-    /**
-     * Return the operation limiter associated with this context.
-     * @return Operation limiter.
-     */
-    @Nonnull protected final OperationLimiter getLimiter() {
-        return limiter;
-    }
-
-    /**
-     * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}.
-     *
-     * @return True if this context is responsible for throttling.
-     */
-    protected final boolean isOperationHandoffComplete() {
-        return handoffComplete;
-    }
-
-    /**
-     * Acquire operation from the limiter if the handoff has completed. If
-     * the handoff is still ongoing, this method does nothing.
-     */
-    protected final void acquireOperation() {
-        if (handoffComplete) {
-            limiter.acquire();
-        }
-    }
-
-    /**
-     * Acquire operation from the limiter if the handoff has NOT completed. If
-     * the handoff has completed, this method does nothing.
-     */
-    protected final void releaseOperation() {
-        if (!handoffComplete) {
-            limiter.release();
-        }
+        return transactionIdentifier;
     }
 
     protected final void incrementModificationCount() {
@@ -78,7 +40,16 @@ abstract class AbstractTransactionContext implements TransactionContext {
     }
 
     @Override
-    public final void operationHandoffComplete() {
-        handoffComplete = true;
+    public final void operationHandOffComplete() {
+        handOffComplete = true;
+    }
+
+    protected boolean isOperationHandOffComplete(){
+        return handOffComplete;
+    }
+
+    @Override
+    public boolean usesOperationLimiting() {
+        return false;
     }
 }
index 976e613..19646f2 100644 (file)
@@ -85,11 +85,12 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
 
         transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
-                parent.getLimiter()));
+                parent.getIdentifier()));
     }
 
     final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
-        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getLimiter());
+        final TransactionContextWrapper transactionContextAdapter =
+                new TransactionContextWrapper(parent.getIdentifier(), actorContext);
 
         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
         if(findPrimaryFuture.isCompleted()) {
@@ -174,11 +175,13 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      */
     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
 
-    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
+    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
+                                                                    final TransactionProxy parent) {
+
         switch(parent.getType()) {
             case READ_ONLY:
                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(readOnly, parent.getLimiter()) {
+                return new LocalTransactionContext(readOnly, parent.getIdentifier()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         throw new UnsupportedOperationException();
@@ -191,7 +194,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case READ_WRITE:
                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(readWrite, parent.getLimiter()) {
+                return new LocalTransactionContext(readWrite, parent.getIdentifier()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return readWrite;
@@ -204,7 +207,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case WRITE_ONLY:
                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(writeOnly, parent.getLimiter()) {
+                return new LocalTransactionContext(writeOnly, parent.getIdentifier()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return writeOnly;
index 4140d33..fc12b37 100644 (file)
@@ -45,7 +45,7 @@ public class DatastoreContext {
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
-    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 100;
+    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
 
     private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
index 9b0accd..e62d15b 100644 (file)
@@ -13,7 +13,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@ -30,8 +30,8 @@ import scala.concurrent.Future;
 abstract class LocalTransactionContext extends AbstractTransactionContext {
     private final DOMStoreTransaction txDelegate;
 
-    LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) {
-        super(limiter);
+    LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier) {
+        super(identifier);
         this.txDelegate = Preconditions.checkNotNull(txDelegate);
     }
 
@@ -43,21 +43,18 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().write(path, data);
-        releaseOperation();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().merge(path, data);
-        releaseOperation();
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         incrementModificationCount();
         getWriteDelegate().delete(path);
-        releaseOperation();
     }
 
     @Override
@@ -66,13 +63,11 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
             @Override
             public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
                 proxyFuture.set(result);
-                releaseOperation();
             }
 
             @Override
             public void onFailure(final Throwable t) {
                 proxyFuture.setException(t);
-                releaseOperation();
             }
         });
     }
@@ -83,39 +78,30 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
             @Override
             public void onSuccess(final Boolean result) {
                 proxyFuture.set(result);
-                releaseOperation();
             }
 
             @Override
             public void onFailure(final Throwable t) {
                 proxyFuture.setException(t);
-                releaseOperation();
             }
         });
     }
 
     private LocalThreePhaseCommitCohort ready() {
         logModificationCount();
-        acquireOperation();
         return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private <T extends Future> T completeOperation(final ActorContext actorContext, final T operationFuture) {
-        operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
-        return operationFuture;
-    }
-
     @Override
     public Future<ActorSelection> readyTransaction() {
         final LocalThreePhaseCommitCohort cohort = ready();
-        return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit());
+        return cohort.initiateCoordinatedCommit();
     }
 
     @Override
     public Future<Object> directCommit() {
         final LocalThreePhaseCommitCohort cohort = ready();
-        return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit());
+        return cohort.initiateDirectCommit();
     }
 
     @Override
@@ -126,6 +112,5 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     @Override
     public void closeTransaction() {
         txDelegate.close();
-        releaseOperation();
     }
 }
index a142142..2094cd2 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -24,8 +25,8 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
 
     private final Throwable failure;
 
-    public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) {
-        super(limiter);
+    public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+        super(identifier);
         this.failure = failure;
     }
 
@@ -42,39 +43,33 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     @Override
     public Future<Object> directCommit() {
         LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
-        releaseOperation();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
-        releaseOperation();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
-        releaseOperation();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
-        releaseOperation();
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
-        releaseOperation();
     }
 
     @Override
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
-        releaseOperation();
 
         final Throwable t;
         if (failure instanceof NoShardLeaderException) {
@@ -88,7 +83,6 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     @Override
     public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
         LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
-        releaseOperation();
         proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
     }
 }
index b422309..34a7ebf 100644 (file)
@@ -26,6 +26,7 @@ public class OperationLimiter extends OnComplete<Object> {
     private final TransactionIdentifier identifier;
     private final long acquireTimeout;
     private final Semaphore semaphore;
+    private final int maxPermits;
 
     OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -34,6 +35,7 @@ public class OperationLimiter extends OnComplete<Object> {
         this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
 
         Preconditions.checkArgument(maxPermits >= 0);
+        this.maxPermits = maxPermits;
         this.semaphore = new Semaphore(maxPermits);
     }
 
@@ -41,7 +43,7 @@ public class OperationLimiter extends OnComplete<Object> {
         acquire(1);
     }
 
-    private void acquire(final int acquirePermits) {
+    void acquire(final int acquirePermits) {
         try {
             if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
                 LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
@@ -55,10 +57,6 @@ public class OperationLimiter extends OnComplete<Object> {
         }
     }
 
-    void release() {
-        this.semaphore.release();
-    }
-
     @Override
     public void onComplete(final Throwable throwable, final Object message) {
         if (message instanceof BatchedModificationsReply) {
@@ -73,7 +71,14 @@ public class OperationLimiter extends OnComplete<Object> {
     }
 
     @VisibleForTesting
-    Semaphore getSemaphore() {
-        return semaphore;
+    int availablePermits(){
+        return semaphore.availablePermits();
+    }
+
+    /**
+     * Release all the permits
+     */
+    public void releaseAll() {
+        this.semaphore.release(maxPermits-availablePermits());
     }
 }
index 7e8a2a0..20074c1 100644 (file)
@@ -11,7 +11,9 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -44,14 +46,16 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     private final ActorSelection actor;
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
+    private final OperationLimiter limiter;
 
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
 
-    protected RemoteTransactionContext(ActorSelection actor,
+    protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationLimiter limiter) {
-        super(limiter);
+        super(identifier);
+        this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
         this.isTxActorLocal = isTxActorLocal;
@@ -59,7 +63,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
-        operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
+        operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
         return operationFuture;
     }
 
@@ -277,4 +281,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
+
+    /**
+     * Acquire operation from the limiter if the hand-off has completed. If
+     * the hand-off is still ongoing, this method does nothing.
+     */
+    private final void acquireOperation() {
+        if (isOperationHandOffComplete()) {
+            limiter.acquire();
+        }
+    }
+
+    @Override
+    public boolean usesOperationLimiting() {
+        return true;
+    }
 }
index afd748f..176073e 100644 (file)
@@ -75,7 +75,7 @@ final class RemoteTransactionContextSupport {
     }
 
     private OperationLimiter getOperationLimiter() {
-        return parent.getLimiter();
+        return transactionContextAdapter.getLimiter();
     }
 
     private TransactionIdentifier getIdentifier() {
@@ -160,7 +160,7 @@ final class RemoteTransactionContextSupport {
         if(failure != null) {
             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
-            localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(failure, getIdentifier());
         } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
             localTransactionContext = createValidTransactionContext(
                     CreateTransactionReply.fromSerializable(response));
@@ -168,7 +168,7 @@ final class RemoteTransactionContextSupport {
             IllegalArgumentException exception = new IllegalArgumentException(String.format(
                     "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-            localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
         }
 
         transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
@@ -189,11 +189,11 @@ final class RemoteTransactionContextSupport {
         final TransactionContext ret;
 
         if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor,
-                getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
+            ret = new PreLithiumTransactionContextImpl(transactionContextAdapter.getIdentifier(), transactionPath, transactionActor,
+                getActorContext(), isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
         } else {
-            ret = new RemoteTransactionContext(transactionActor, getActorContext(),
-                isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
+            ret = new RemoteTransactionContext(transactionContextAdapter.getIdentifier(), transactionActor, getActorContext(),
+                isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
         }
 
         if(parent.getType() == TransactionType.READ_ONLY) {
index e5130ed..6a54200 100644 (file)
@@ -45,5 +45,11 @@ interface TransactionContext {
      * Implementations can rely on the wrapper calling this operation in a synchronized
      * block, so they do not need to ensure visibility of this state transition themselves.
      */
-    void operationHandoffComplete();
+    void operationHandOffComplete();
+
+    /**
+     * A TransactionContext that uses Operation limiting should return true else false
+     * @return
+     */
+    boolean usesOperationLimiting();
 }
index b08d419..89a6a97 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Collection;
 import java.util.List;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -37,6 +38,8 @@ class TransactionContextWrapper {
     @GuardedBy("queuedTxOperations")
     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
 
+    private final TransactionIdentifier identifier;
+
     /**
      * The resulting TransactionContext.
      */
@@ -44,8 +47,11 @@ class TransactionContextWrapper {
 
     private final OperationLimiter limiter;
 
-    TransactionContextWrapper(final OperationLimiter limiter) {
-        this.limiter = Preconditions.checkNotNull(limiter);
+    TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.limiter = new OperationLimiter(identifier,
+                actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
+                actorContext.getDatastoreContext().getOperationTimeoutInSeconds());
     }
 
     TransactionContext getTransactionContext() {
@@ -53,7 +59,7 @@ class TransactionContextWrapper {
     }
 
     TransactionIdentifier getIdentifier() {
-        return limiter.getIdentifier();
+        return identifier;
     }
 
     /**
@@ -106,7 +112,10 @@ class TransactionContextWrapper {
                 if (queuedTxOperations.isEmpty()) {
                     // We're done invoking the TransactionOperations so we can now publish the
                     // TransactionContext.
-                    localTransactionContext.operationHandoffComplete();
+                    localTransactionContext.operationHandOffComplete();
+                    if(!localTransactionContext.usesOperationLimiting()){
+                        limiter.releaseAll();
+                    }
                     transactionContext = localTransactionContext;
                     break;
                 }
@@ -140,4 +149,10 @@ class TransactionContextWrapper {
 
         return promise.future();
     }
+
+    public OperationLimiter getLimiter() {
+        return limiter;
+    }
+
+
 }
index 1bda781..f7cb27b 100644 (file)
@@ -53,7 +53,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
-    private final OperationLimiter limiter;
     private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
 
@@ -64,11 +63,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         this.txContextFactory = txContextFactory;
         this.type = Preconditions.checkNotNull(type);
 
-        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-        this.limiter = new OperationLimiter(getIdentifier(),
-            getActorContext().getTransactionOutstandingOperationLimit(),
-            getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
-
         LOG.debug("New {} Tx - {}", type, getIdentifier());
     }
 
@@ -337,8 +331,4 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     ActorContext getActorContext() {
         return txContextFactory.getActorContext();
     }
-
-    OperationLimiter getLimiter() {
-        return limiter;
-    }
 }
index 733543a..c441663 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.OperationLimiter;
 import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
@@ -34,10 +35,10 @@ public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
 
     private final String transactionPath;
 
-    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor,
+    public PreLithiumTransactionContextImpl(TransactionIdentifier identifier, String transactionPath, ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationLimiter limiter) {
-        super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
+        super(identifier, actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
         this.transactionPath = transactionPath;
     }
 
index 6640898..b07cf28 100644 (file)
@@ -25,7 +25,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
@@ -92,7 +91,6 @@ public class ActorContext {
     private Timeout operationTimeout;
     private final String selfAddressHostPort;
     private TransactionRateLimiter txRateLimiter;
-    private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
@@ -129,7 +127,6 @@ public class ActorContext {
             selfAddressHostPort = null;
         }
 
-        transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
     }
 
     private void setCachedProperties() {
@@ -465,18 +462,6 @@ public class ActorContext {
         return builder.toString();
     }
 
-    /**
-     * Get the maximum number of operations that are to be permitted within a transaction before the transaction
-     * should begin throttling the operations
-     *
-     * Parking reading this configuration here because we need to get to the actor system settings
-     *
-     * @return
-     */
-    public int getTransactionOutstandingOperationLimit(){
-        return transactionOutstandingOperationLimit;
-    }
-
     /**
      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
      * us to create a timer for pretty much anything.
index 09125f1..71c440e 100644 (file)
@@ -167,7 +167,7 @@ module distributed-datastore-provider {
          }
 
          leaf shard-batched-modification-count {
-            default 100;
+            default 1000;
             type non-zero-uint32-type;
             description "The number of transaction modification operations (put, merge, delete) to
                         batch before sending to the shard transaction actor. Batching improves
index 8e9d79e..fcc18ed 100644 (file)
@@ -66,6 +66,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
@@ -98,13 +99,23 @@ public abstract class AbstractTransactionProxyTest {
                         public String findShard(YangInstanceIdentifier path) {
                             return "junk";
                         }
+                    }).put(
+                    "cars", new ShardStrategy() {
+                        @Override
+                        public String findShard(YangInstanceIdentifier path) {
+                            return "cars";
+                        }
                     }).build();
         }
 
         @Override
         public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
-            return TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace) ?
-                    Optional.of("junk") : Optional.<String>absent();
+            if(TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
+                return Optional.of("junk");
+            } else if(CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)){
+                return Optional.of("cars");
+            }
+            return Optional.<String>absent();
         }
     };
 
@@ -151,7 +162,6 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         mockComponentFactory = TransactionContextFactory.create(mockActorContext);
 
@@ -342,8 +352,6 @@ public abstract class AbstractTransactionProxyTest {
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         return actorRef;
     }
 
index b04612e..0545bba 100644 (file)
@@ -36,7 +36,7 @@ public class LocalTransactionContextTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) {
+        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier()) {
             @Override
             protected DOMStoreWriteTransaction getWriteDelegate() {
                 return readWriteTransaction;
@@ -54,7 +54,6 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
-        verify(limiter).release();
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
     }
 
@@ -63,7 +62,6 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
-        verify(limiter).release();
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
     }
 
@@ -71,7 +69,6 @@ public class LocalTransactionContextTest {
     public void testDelete() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         localTransactionContext.deleteData(yangInstanceIdentifier);
-        verify(limiter).release();
         verify(readWriteTransaction).delete(yangInstanceIdentifier);
     }
 
@@ -82,7 +79,6 @@ public class LocalTransactionContextTest {
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
         localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
-        verify(limiter).release();
         verify(readWriteTransaction).read(yangInstanceIdentifier);
     }
 
@@ -91,7 +87,6 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
         localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
-        verify(limiter).release();
         verify(readWriteTransaction).exists(yangInstanceIdentifier);
     }
 
@@ -104,7 +99,6 @@ public class LocalTransactionContextTest {
         doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
         doReturn(mockCohort).when(readWriteTransaction).ready();
         localTransactionContext.readyTransaction();
-        verify(limiter).onComplete(null, null);
         verify(readWriteTransaction).ready();
     }
 
index 40ce84b..7d49090 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
-import java.util.concurrent.Semaphore;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
@@ -25,21 +24,21 @@ public class OperationLimiterTest {
     public void testOnComplete() throws Exception {
         int permits = 10;
         OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1);
-        Semaphore semaphore = limiter.getSemaphore();
-        semaphore.acquire(permits);
+        limiter.acquire(permits);
         int availablePermits = 0;
 
         limiter.onComplete(null, DataExistsReply.create(true));
-        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, DataExistsReply.create(true));
-        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, new IllegalArgumentException());
-        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, new BatchedModificationsReply(4));
         availablePermits += 4;
-        assertEquals("availablePermits", availablePermits, semaphore.availablePermits());
+        assertEquals("availablePermits", availablePermits, limiter.availablePermits());
     }
+
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java
new file mode 100644 (file)
index 0000000..bc98f9a
--- /dev/null
@@ -0,0 +1,44 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+public class TransactionContextWrapperTest {
+
+    @Mock
+    TransactionIdentifier identifier;
+
+    @Mock
+    ActorContext actorContext;
+
+    @Mock
+    TransactionContext transactionContext;
+
+    TransactionContextWrapper transactionContextWrapper;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+        doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+        transactionContextWrapper = new TransactionContextWrapper(identifier, actorContext);
+    }
+
+    @Test
+    public void testExecutePriorTransactionOperations(){
+        for(int i=0;i<100;i++) {
+            transactionContextWrapper.maybeExecuteTransactionOperation(mock(TransactionOperation.class));
+        }
+        assertEquals(901, transactionContextWrapper.getLimiter().availablePermits());
+
+        transactionContextWrapper.executePriorTransactionOperations(transactionContext);
+
+        assertEquals(1001, transactionContextWrapper.getLimiter().availablePermits());
+    }
+}
\ No newline at end of file
index 26d51cb..4301a72 100644 (file)
@@ -798,6 +798,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         throttleOperation(operation, 1, true);
     }
 
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+    }
+
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
                 Optional.<DataTree>absent());
@@ -809,11 +813,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
 
-    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+        // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
+        // we now allow one extra permit to be allowed for ready
+        doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
+                shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
 
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
@@ -821,6 +828,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         if(shardFound) {
             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+            doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
+                    when(mockActorContext).findPrimaryShardAsync(eq("cars"));
+
         } else {
             doReturn(Futures.failed(new Exception("not found")))
                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
@@ -845,9 +855,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         long end = System.nanoTime();
 
-        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
-                expected, (end-start)), (end - start) > expected);
+                expectedCompletionTime, (end-start)),
+                ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
 
     }
 
@@ -859,8 +869,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
@@ -903,8 +911,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
@@ -952,7 +958,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -968,7 +973,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteThrottlingWhenShardFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -986,7 +990,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testWriteThrottlingWhenShardNotFound(){
         // Confirm that there is no throttling when the Shard is not found
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1005,7 +1008,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1022,7 +1024,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1039,7 +1040,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardNotFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1056,7 +1056,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1074,7 +1073,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1122,7 +1120,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1137,7 +1134,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1326,7 +1322,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testReadyThrottlingWithTwoTransactionContexts(){
-
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1340,11 +1335,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+                // Trying to write to Cars will cause another transaction context to get created
+                transactionProxy.write(CarsModel.BASE_PATH, carsNode);
 
+                // Now ready should block for both transaction contexts
                 transactionProxy.ready();
             }
-        }, 2, true);
+        }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2);
     }
 
     private void testModificationOperationBatching(TransactionType type) throws Exception {
@@ -1526,8 +1523,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
 
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.