BUG 3019 : Fix Operation throttling for modification batching scenarios 71/22371/2
authorMoiz Raja <moraja@cisco.com>
Wed, 10 Jun 2015 01:37:35 +0000 (18:37 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 11 Jun 2015 15:47:32 +0000 (15:47 +0000)
This patch straightens out where exactly limiting is done.

A TransactionProxy creates a TransactionContext for every shard on
which a transaction needs to be done. There are 3 types of TransactionContexts.
NoOpTransactionContext, LocalTransactionContext and RemoteTransactionContext.
When a operation is done on TransactionProxy it does not know which of these
TransactionContexts it should create so it first createas a TransactionContextWrapper.
All operations on TransactionProxy are then queued up in the TransactionContextWrapper
till we determine which TransactionContext to create. This patch creates an
OperationLimiter per TransactionContextWrapper. Everytime an operation is enqueued
we acquire a permit.

When the TransactionContext is finally created we do different things depending on
the TransactionContext. For NoOp and Local TransactionContexts we completely ignore
the limiter - that is for these TransactionContexts there is no limiting done. For
RemoteTransactionContext we do limiting. RemoteTransactionContext does not acquire
Operation permits till it is made visible by the TransactionContextWrapper - this is
signaled be the setting of the handOffComplete flag in AbstractTransactionContext.
After that RemoteTransactionContext takes over the business of acquiring permits.
OperationLimiter which also serves as the Operation completion handler is the only
component that releases the permits.

Another thing which this patch addresses is which configuration option we use for
operation limiting. We now use ShardBatchedModificationCount instead of the mailbox
limit from akka.conf. This removes the possibility of mis-configuration where
making ShardedBatchedModificationCount higher than mailbox limit could cause
unexpected blocking.

Change-Id: I571ba5278630e5166be6bcb3ff8e1c527c5e3343
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 21ccb7510c28e824d6441d48604aec7467d44710)

20 files changed:
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index cfbf9450aa2b77661f21bd00904a995b48c9c3d7..e27376290e3503aaba2e264b3407e4be9de44d92 100644 (file)
@@ -51,7 +51,7 @@ operational.persistent=false
 # The number of transaction modification operations (put, merge, delete) to batch before sending to the 
 # shard transaction actor. Batching improves performance as less modifications messages are sent to the 
 # actor and thus lessens the chance that the transaction actor's mailbox queue could get full.
-#shard-batched-modification-count=100
+#shard-batched-modification-count=1000
 
 # The maximum amount of time for akka operations (remote or local) to complete before failing.
 #operation-timeout-in-seconds=5
index 571899ba14343f7b8f189571e18f708b397fc86b..97a0205ff2b7bc1a94bfe88c23e26a642b2a0866 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Preconditions;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.slf4j.Logger;
@@ -15,12 +14,12 @@ import org.slf4j.LoggerFactory;
 
 abstract class AbstractTransactionContext implements TransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
-    private final OperationLimiter limiter;
+    private final TransactionIdentifier transactionIdentifier;
     private long modificationCount = 0;
-    private boolean handoffComplete;
+    private boolean handOffComplete;
 
-    protected AbstractTransactionContext(final OperationLimiter limiter) {
-        this.limiter = Preconditions.checkNotNull(limiter);
+    protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) {
+        this.transactionIdentifier = transactionIdentifier;
     }
 
     /**
@@ -29,44 +28,7 @@ abstract class AbstractTransactionContext implements TransactionContext {
      * @return Transaction identifier.
      */
     @Nonnull protected final TransactionIdentifier getIdentifier() {
-        return limiter.getIdentifier();
-    }
-
-    /**
-     * Return the operation limiter associated with this context.
-     * @return Operation limiter.
-     */
-    @Nonnull protected final OperationLimiter getLimiter() {
-        return limiter;
-    }
-
-    /**
-     * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}.
-     *
-     * @return True if this context is responsible for throttling.
-     */
-    protected final boolean isOperationHandoffComplete() {
-        return handoffComplete;
-    }
-
-    /**
-     * Acquire operation from the limiter if the handoff has completed. If
-     * the handoff is still ongoing, this method does nothing.
-     */
-    protected final void acquireOperation() {
-        if (handoffComplete) {
-            limiter.acquire();
-        }
-    }
-
-    /**
-     * Acquire operation from the limiter if the handoff has NOT completed. If
-     * the handoff has completed, this method does nothing.
-     */
-    protected final void releaseOperation() {
-        if (!handoffComplete) {
-            limiter.release();
-        }
+        return transactionIdentifier;
     }
 
     protected final void incrementModificationCount() {
@@ -78,7 +40,16 @@ abstract class AbstractTransactionContext implements TransactionContext {
     }
 
     @Override
-    public final void operationHandoffComplete() {
-        handoffComplete = true;
+    public final void operationHandOffComplete() {
+        handOffComplete = true;
+    }
+
+    protected boolean isOperationHandOffComplete(){
+        return handOffComplete;
+    }
+
+    @Override
+    public boolean usesOperationLimiting() {
+        return false;
     }
 }
index 976e613e8ea183b293d666bc56c8fd8cbc358123..19646f27fc9a1f42fbbd716dce0f0e721ad0ef0a 100644 (file)
@@ -85,11 +85,12 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
 
         transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
-                parent.getLimiter()));
+                parent.getIdentifier()));
     }
 
     final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
-        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getLimiter());
+        final TransactionContextWrapper transactionContextAdapter =
+                new TransactionContextWrapper(parent.getIdentifier(), actorContext);
 
         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
         if(findPrimaryFuture.isCompleted()) {
@@ -174,11 +175,13 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      */
     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
 
-    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
+    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
+                                                                    final TransactionProxy parent) {
+
         switch(parent.getType()) {
             case READ_ONLY:
                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(readOnly, parent.getLimiter()) {
+                return new LocalTransactionContext(readOnly, parent.getIdentifier()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         throw new UnsupportedOperationException();
@@ -191,7 +194,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case READ_WRITE:
                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(readWrite, parent.getLimiter()) {
+                return new LocalTransactionContext(readWrite, parent.getIdentifier()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return readWrite;
@@ -204,7 +207,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case WRITE_ONLY:
                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(writeOnly, parent.getLimiter()) {
+                return new LocalTransactionContext(writeOnly, parent.getIdentifier()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return writeOnly;
index 4140d3323b33be4955b720eb8238644ebb3f305e..fc12b3766f770906b489b02c61a750fcf2178d5f 100644 (file)
@@ -45,7 +45,7 @@ public class DatastoreContext {
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
-    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 100;
+    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
 
     private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
index 9b0accd455117f464d556fa8fc8c73f1990d0b23..e62d15b7ffc2bb05dca59e0fdae16739e0e778e3 100644 (file)
@@ -13,7 +13,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@ -30,8 +30,8 @@ import scala.concurrent.Future;
 abstract class LocalTransactionContext extends AbstractTransactionContext {
     private final DOMStoreTransaction txDelegate;
 
-    LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) {
-        super(limiter);
+    LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier) {
+        super(identifier);
         this.txDelegate = Preconditions.checkNotNull(txDelegate);
     }
 
@@ -43,21 +43,18 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().write(path, data);
-        releaseOperation();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().merge(path, data);
-        releaseOperation();
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         incrementModificationCount();
         getWriteDelegate().delete(path);
-        releaseOperation();
     }
 
     @Override
@@ -66,13 +63,11 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
             @Override
             public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
                 proxyFuture.set(result);
-                releaseOperation();
             }
 
             @Override
             public void onFailure(final Throwable t) {
                 proxyFuture.setException(t);
-                releaseOperation();
             }
         });
     }
@@ -83,39 +78,30 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
             @Override
             public void onSuccess(final Boolean result) {
                 proxyFuture.set(result);
-                releaseOperation();
             }
 
             @Override
             public void onFailure(final Throwable t) {
                 proxyFuture.setException(t);
-                releaseOperation();
             }
         });
     }
 
     private LocalThreePhaseCommitCohort ready() {
         logModificationCount();
-        acquireOperation();
         return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private <T extends Future> T completeOperation(final ActorContext actorContext, final T operationFuture) {
-        operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
-        return operationFuture;
-    }
-
     @Override
     public Future<ActorSelection> readyTransaction() {
         final LocalThreePhaseCommitCohort cohort = ready();
-        return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit());
+        return cohort.initiateCoordinatedCommit();
     }
 
     @Override
     public Future<Object> directCommit() {
         final LocalThreePhaseCommitCohort cohort = ready();
-        return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit());
+        return cohort.initiateDirectCommit();
     }
 
     @Override
@@ -126,6 +112,5 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     @Override
     public void closeTransaction() {
         txDelegate.close();
-        releaseOperation();
     }
 }
index a1421429405506d9c4ea1c0f0d4f848bc11c7b78..2094cd2f77ff1a8399f88ce0bb4247603c484cb3 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -24,8 +25,8 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
 
     private final Throwable failure;
 
-    public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) {
-        super(limiter);
+    public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+        super(identifier);
         this.failure = failure;
     }
 
@@ -42,39 +43,33 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     @Override
     public Future<Object> directCommit() {
         LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
-        releaseOperation();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
-        releaseOperation();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
-        releaseOperation();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
-        releaseOperation();
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
-        releaseOperation();
     }
 
     @Override
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
-        releaseOperation();
 
         final Throwable t;
         if (failure instanceof NoShardLeaderException) {
@@ -88,7 +83,6 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     @Override
     public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
         LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
-        releaseOperation();
         proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
     }
 }
index b42230971b2fe276ea5bc8d39f25a3d2f455de83..34a7ebf8f233f8ab3d14c57e3e63b17357035028 100644 (file)
@@ -26,6 +26,7 @@ public class OperationLimiter extends OnComplete<Object> {
     private final TransactionIdentifier identifier;
     private final long acquireTimeout;
     private final Semaphore semaphore;
+    private final int maxPermits;
 
     OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -34,6 +35,7 @@ public class OperationLimiter extends OnComplete<Object> {
         this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
 
         Preconditions.checkArgument(maxPermits >= 0);
+        this.maxPermits = maxPermits;
         this.semaphore = new Semaphore(maxPermits);
     }
 
@@ -41,7 +43,7 @@ public class OperationLimiter extends OnComplete<Object> {
         acquire(1);
     }
 
-    private void acquire(final int acquirePermits) {
+    void acquire(final int acquirePermits) {
         try {
             if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
                 LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
@@ -55,10 +57,6 @@ public class OperationLimiter extends OnComplete<Object> {
         }
     }
 
-    void release() {
-        this.semaphore.release();
-    }
-
     @Override
     public void onComplete(final Throwable throwable, final Object message) {
         if (message instanceof BatchedModificationsReply) {
@@ -73,7 +71,14 @@ public class OperationLimiter extends OnComplete<Object> {
     }
 
     @VisibleForTesting
-    Semaphore getSemaphore() {
-        return semaphore;
+    int availablePermits(){
+        return semaphore.availablePermits();
+    }
+
+    /**
+     * Release all the permits
+     */
+    public void releaseAll() {
+        this.semaphore.release(maxPermits-availablePermits());
     }
 }
index 7e8a2a00ebe3fb701cb66f0991ba1a0fee9e2cf0..20074c10289908d20839fb2fbbd43e0089a7d24c 100644 (file)
@@ -11,7 +11,9 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -44,14 +46,16 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     private final ActorSelection actor;
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
+    private final OperationLimiter limiter;
 
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
 
-    protected RemoteTransactionContext(ActorSelection actor,
+    protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationLimiter limiter) {
-        super(limiter);
+        super(identifier);
+        this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
         this.isTxActorLocal = isTxActorLocal;
@@ -59,7 +63,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
-        operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
+        operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
         return operationFuture;
     }
 
@@ -277,4 +281,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
+
+    /**
+     * Acquire operation from the limiter if the hand-off has completed. If
+     * the hand-off is still ongoing, this method does nothing.
+     */
+    private final void acquireOperation() {
+        if (isOperationHandOffComplete()) {
+            limiter.acquire();
+        }
+    }
+
+    @Override
+    public boolean usesOperationLimiting() {
+        return true;
+    }
 }
index afd748fd484c5303f27558497bb586ae137bc057..176073ef705cdcd38d714d395f07988fd060b4e9 100644 (file)
@@ -75,7 +75,7 @@ final class RemoteTransactionContextSupport {
     }
 
     private OperationLimiter getOperationLimiter() {
-        return parent.getLimiter();
+        return transactionContextAdapter.getLimiter();
     }
 
     private TransactionIdentifier getIdentifier() {
@@ -160,7 +160,7 @@ final class RemoteTransactionContextSupport {
         if(failure != null) {
             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
-            localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(failure, getIdentifier());
         } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
             localTransactionContext = createValidTransactionContext(
                     CreateTransactionReply.fromSerializable(response));
@@ -168,7 +168,7 @@ final class RemoteTransactionContextSupport {
             IllegalArgumentException exception = new IllegalArgumentException(String.format(
                     "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-            localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
         }
 
         transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
@@ -189,11 +189,11 @@ final class RemoteTransactionContextSupport {
         final TransactionContext ret;
 
         if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor,
-                getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
+            ret = new PreLithiumTransactionContextImpl(transactionContextAdapter.getIdentifier(), transactionPath, transactionActor,
+                getActorContext(), isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
         } else {
-            ret = new RemoteTransactionContext(transactionActor, getActorContext(),
-                isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
+            ret = new RemoteTransactionContext(transactionContextAdapter.getIdentifier(), transactionActor, getActorContext(),
+                isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
         }
 
         if(parent.getType() == TransactionType.READ_ONLY) {
index e5130ed6dc78df0420bc811c4c18546df139db0b..6a542002d0a4293464edbca7e0f25fb83d17b0a7 100644 (file)
@@ -45,5 +45,11 @@ interface TransactionContext {
      * Implementations can rely on the wrapper calling this operation in a synchronized
      * block, so they do not need to ensure visibility of this state transition themselves.
      */
-    void operationHandoffComplete();
+    void operationHandOffComplete();
+
+    /**
+     * A TransactionContext that uses Operation limiting should return true else false
+     * @return
+     */
+    boolean usesOperationLimiting();
 }
index b08d4192b48c1ddd7b3cf3162782b68ef129c855..89a6a97fd68181aa140b38d367da79e0f2888693 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Collection;
 import java.util.List;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -37,6 +38,8 @@ class TransactionContextWrapper {
     @GuardedBy("queuedTxOperations")
     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
 
+    private final TransactionIdentifier identifier;
+
     /**
      * The resulting TransactionContext.
      */
@@ -44,8 +47,11 @@ class TransactionContextWrapper {
 
     private final OperationLimiter limiter;
 
-    TransactionContextWrapper(final OperationLimiter limiter) {
-        this.limiter = Preconditions.checkNotNull(limiter);
+    TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.limiter = new OperationLimiter(identifier,
+                actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
+                actorContext.getDatastoreContext().getOperationTimeoutInSeconds());
     }
 
     TransactionContext getTransactionContext() {
@@ -53,7 +59,7 @@ class TransactionContextWrapper {
     }
 
     TransactionIdentifier getIdentifier() {
-        return limiter.getIdentifier();
+        return identifier;
     }
 
     /**
@@ -106,7 +112,10 @@ class TransactionContextWrapper {
                 if (queuedTxOperations.isEmpty()) {
                     // We're done invoking the TransactionOperations so we can now publish the
                     // TransactionContext.
-                    localTransactionContext.operationHandoffComplete();
+                    localTransactionContext.operationHandOffComplete();
+                    if(!localTransactionContext.usesOperationLimiting()){
+                        limiter.releaseAll();
+                    }
                     transactionContext = localTransactionContext;
                     break;
                 }
@@ -140,4 +149,10 @@ class TransactionContextWrapper {
 
         return promise.future();
     }
+
+    public OperationLimiter getLimiter() {
+        return limiter;
+    }
+
+
 }
index 1bda7810edb2d5c7ea43e570b3c49e51d1aa7deb..f7cb27b07f2c139c857e2c9308372678e7c29afd 100644 (file)
@@ -53,7 +53,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
-    private final OperationLimiter limiter;
     private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
 
@@ -64,11 +63,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         this.txContextFactory = txContextFactory;
         this.type = Preconditions.checkNotNull(type);
 
-        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-        this.limiter = new OperationLimiter(getIdentifier(),
-            getActorContext().getTransactionOutstandingOperationLimit(),
-            getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
-
         LOG.debug("New {} Tx - {}", type, getIdentifier());
     }
 
@@ -337,8 +331,4 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     ActorContext getActorContext() {
         return txContextFactory.getActorContext();
     }
-
-    OperationLimiter getLimiter() {
-        return limiter;
-    }
 }
index 733543aabdc262a7c3d5692a2629e6bc2b05982d..c44166396fcd469f39a7fbc2e0715e31620635c9 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.OperationLimiter;
 import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
@@ -34,10 +35,10 @@ public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
 
     private final String transactionPath;
 
-    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor,
+    public PreLithiumTransactionContextImpl(TransactionIdentifier identifier, String transactionPath, ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationLimiter limiter) {
-        super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
+        super(identifier, actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
         this.transactionPath = transactionPath;
     }
 
index 6640898dea4c651d058d549289eb3fe02c2baacd..b07cf28a7d7f03fe5de3abd7ca8c24c666e6c539 100644 (file)
@@ -25,7 +25,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
@@ -92,7 +91,6 @@ public class ActorContext {
     private Timeout operationTimeout;
     private final String selfAddressHostPort;
     private TransactionRateLimiter txRateLimiter;
-    private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
@@ -129,7 +127,6 @@ public class ActorContext {
             selfAddressHostPort = null;
         }
 
-        transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
     }
 
     private void setCachedProperties() {
@@ -465,18 +462,6 @@ public class ActorContext {
         return builder.toString();
     }
 
-    /**
-     * Get the maximum number of operations that are to be permitted within a transaction before the transaction
-     * should begin throttling the operations
-     *
-     * Parking reading this configuration here because we need to get to the actor system settings
-     *
-     * @return
-     */
-    public int getTransactionOutstandingOperationLimit(){
-        return transactionOutstandingOperationLimit;
-    }
-
     /**
      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
      * us to create a timer for pretty much anything.
index 09125f1b5959500e9953f57ad911e54e495fd288..71c440eeced5b67051f0b9f15e53696d0f9d0c18 100644 (file)
@@ -167,7 +167,7 @@ module distributed-datastore-provider {
          }
 
          leaf shard-batched-modification-count {
-            default 100;
+            default 1000;
             type non-zero-uint32-type;
             description "The number of transaction modification operations (put, merge, delete) to
                         batch before sending to the shard transaction actor. Batching improves
index 8e9d79ee5e6f54f24796455c129f4c7caded96fa..fcc18ed70251de9280fed8a0f32518a18f21ccc2 100644 (file)
@@ -66,6 +66,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
@@ -98,13 +99,23 @@ public abstract class AbstractTransactionProxyTest {
                         public String findShard(YangInstanceIdentifier path) {
                             return "junk";
                         }
+                    }).put(
+                    "cars", new ShardStrategy() {
+                        @Override
+                        public String findShard(YangInstanceIdentifier path) {
+                            return "cars";
+                        }
                     }).build();
         }
 
         @Override
         public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
-            return TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace) ?
-                    Optional.of("junk") : Optional.<String>absent();
+            if(TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
+                return Optional.of("junk");
+            } else if(CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)){
+                return Optional.of("cars");
+            }
+            return Optional.<String>absent();
         }
     };
 
@@ -151,7 +162,6 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         mockComponentFactory = TransactionContextFactory.create(mockActorContext);
 
@@ -342,8 +352,6 @@ public abstract class AbstractTransactionProxyTest {
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         return actorRef;
     }
 
index b04612ec85d9555a1feb1153a3533d73516d610e..0545bbae360ea0be22f1fc19fe7906b4e7a3220a 100644 (file)
@@ -36,7 +36,7 @@ public class LocalTransactionContextTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) {
+        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier()) {
             @Override
             protected DOMStoreWriteTransaction getWriteDelegate() {
                 return readWriteTransaction;
@@ -54,7 +54,6 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
-        verify(limiter).release();
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
     }
 
@@ -63,7 +62,6 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
-        verify(limiter).release();
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
     }
 
@@ -71,7 +69,6 @@ public class LocalTransactionContextTest {
     public void testDelete() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         localTransactionContext.deleteData(yangInstanceIdentifier);
-        verify(limiter).release();
         verify(readWriteTransaction).delete(yangInstanceIdentifier);
     }
 
@@ -82,7 +79,6 @@ public class LocalTransactionContextTest {
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
         localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
-        verify(limiter).release();
         verify(readWriteTransaction).read(yangInstanceIdentifier);
     }
 
@@ -91,7 +87,6 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
         localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
-        verify(limiter).release();
         verify(readWriteTransaction).exists(yangInstanceIdentifier);
     }
 
@@ -104,7 +99,6 @@ public class LocalTransactionContextTest {
         doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
         doReturn(mockCohort).when(readWriteTransaction).ready();
         localTransactionContext.readyTransaction();
-        verify(limiter).onComplete(null, null);
         verify(readWriteTransaction).ready();
     }
 
index 40ce84b234d033d2cda630105c631459bbf91be5..7d49090f685e3cbc879c6fdecfb465fc60a1d379 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
-import java.util.concurrent.Semaphore;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
@@ -25,21 +24,21 @@ public class OperationLimiterTest {
     public void testOnComplete() throws Exception {
         int permits = 10;
         OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1);
-        Semaphore semaphore = limiter.getSemaphore();
-        semaphore.acquire(permits);
+        limiter.acquire(permits);
         int availablePermits = 0;
 
         limiter.onComplete(null, DataExistsReply.create(true));
-        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, DataExistsReply.create(true));
-        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, new IllegalArgumentException());
-        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, new BatchedModificationsReply(4));
         availablePermits += 4;
-        assertEquals("availablePermits", availablePermits, semaphore.availablePermits());
+        assertEquals("availablePermits", availablePermits, limiter.availablePermits());
     }
+
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java
new file mode 100644 (file)
index 0000000..bc98f9a
--- /dev/null
@@ -0,0 +1,44 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+public class TransactionContextWrapperTest {
+
+    @Mock
+    TransactionIdentifier identifier;
+
+    @Mock
+    ActorContext actorContext;
+
+    @Mock
+    TransactionContext transactionContext;
+
+    TransactionContextWrapper transactionContextWrapper;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+        doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+        transactionContextWrapper = new TransactionContextWrapper(identifier, actorContext);
+    }
+
+    @Test
+    public void testExecutePriorTransactionOperations(){
+        for(int i=0;i<100;i++) {
+            transactionContextWrapper.maybeExecuteTransactionOperation(mock(TransactionOperation.class));
+        }
+        assertEquals(901, transactionContextWrapper.getLimiter().availablePermits());
+
+        transactionContextWrapper.executePriorTransactionOperations(transactionContext);
+
+        assertEquals(1001, transactionContextWrapper.getLimiter().availablePermits());
+    }
+}
\ No newline at end of file
index 26d51cbae3bf2df728ec1fa9e3322d0151b114b4..4301a72d180273d79e868b0c8de2acf19f916ff9 100644 (file)
@@ -798,6 +798,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         throttleOperation(operation, 1, true);
     }
 
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+    }
+
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
                 Optional.<DataTree>absent());
@@ -809,11 +813,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
 
-    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+        // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
+        // we now allow one extra permit to be allowed for ready
+        doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
+                shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
 
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
@@ -821,6 +828,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         if(shardFound) {
             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+            doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
+                    when(mockActorContext).findPrimaryShardAsync(eq("cars"));
+
         } else {
             doReturn(Futures.failed(new Exception("not found")))
                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
@@ -845,9 +855,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         long end = System.nanoTime();
 
-        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
-                expected, (end-start)), (end - start) > expected);
+                expectedCompletionTime, (end-start)),
+                ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
 
     }
 
@@ -859,8 +869,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
@@ -903,8 +911,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
@@ -952,7 +958,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -968,7 +973,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteThrottlingWhenShardFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -986,7 +990,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testWriteThrottlingWhenShardNotFound(){
         // Confirm that there is no throttling when the Shard is not found
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1005,7 +1008,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1022,7 +1024,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1039,7 +1040,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardNotFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1056,7 +1056,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1074,7 +1073,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1122,7 +1120,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1137,7 +1134,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1326,7 +1322,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testReadyThrottlingWithTwoTransactionContexts(){
-
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1340,11 +1335,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+                // Trying to write to Cars will cause another transaction context to get created
+                transactionProxy.write(CarsModel.BASE_PATH, carsNode);
 
+                // Now ready should block for both transaction contexts
                 transactionProxy.ready();
             }
-        }, 2, true);
+        }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2);
     }
 
     private void testModificationOperationBatching(TransactionType type) throws Exception {
@@ -1526,8 +1523,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
 
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);