BUG 2518 : Throttle operations in a transaction 55/14155/4
authorMoiz Raja <moraja@cisco.com>
Wed, 14 Jan 2015 23:21:35 +0000 (15:21 -0800)
committerMoiz Raja <moraja@cisco.com>
Mon, 26 Jan 2015 10:49:35 +0000 (11:49 +0100)
In some use cases a single transaction may do a lot of operations
in a short period of time. This happens for example when testing
the bgp-plugin. To ensure that clients do not overwhelm the datastore
this patch throttles operations on the transaction proxy side and
prevents the client from sending more operations than can be handled
by the ShardTransaction actor in a reasonable amount of time.

The throttling serves as a back-pressure mechanism. Akka recommends
that for back pressure we use a bounded mailbox with an adequate
push timeout. We are doing this. However the akka's behavior on the
timeout expiring is to send the throttled message to dead letters.
So to properly do what akka expects us to do we would need to watch
dead letters and use that as an indication that we need back pressure
which I think is inadequate for our purpose and thus the Semaphore
based throttling.

Change-Id: Ib1a0f128ffde009a82b8cd67001203e0b959fdf5
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java

index f34e88f..9f48ef9 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -185,6 +186,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
+    private final Semaphore operationLimiter;
+    private final OperationCompleter operationCompleter;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
         this(actorContext, transactionType, "");
@@ -221,6 +224,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             phantomReferenceCache.put(cleanup, cleanup);
         }
 
+        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+        this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+        this.operationCompleter = new OperationCompleter(operationLimiter);
+
         LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
     }
 
@@ -257,6 +264,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} read {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
             @Override
@@ -275,6 +284,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} exists {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
             @Override
@@ -292,6 +303,25 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 "Transaction is sealed - further modifications are not allowed");
     }
 
+    private void throttleOperation() {
+        throttleOperation(1);
+    }
+
+    private void throttleOperation(int acquirePermits) {
+        try {
+            if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
+            }
+        } catch (InterruptedException e) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
+            } else {
+                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+            }
+        }
+    }
+
+
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
@@ -299,6 +329,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} write {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -315,6 +347,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} merge {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -331,6 +365,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} delete {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -345,6 +381,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
+        throttleOperation(txFutureCallbackMap.size());
+
         inReadyState = true;
 
         LOG.debug("Tx {} Readying {} transactions for commit", identifier,
@@ -668,7 +706,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
                             failure.getMessage());
 
-                    localTransactionContext = new NoOpTransactionContext(failure, identifier);
+                    localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                     localTransactionContext = createValidTransactionContext(
                             CreateTransactionReply.fromSerializable(response));
@@ -676,7 +714,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    localTransactionContext = new NoOpTransactionContext(exception, identifier);
+                    localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
                 }
 
                 for(TransactionOperation oper: txOperationsOnComplete) {
@@ -713,7 +751,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
             return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal, reply.getVersion());
+                actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
         }
     }
 
@@ -755,35 +793,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
         private final ActorContext actorContext;
-        private final SchemaContext schemaContext;
         private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
         private final short remoteTransactionVersion;
+        private final OperationCompleter operationCompleter;
+
 
         private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal, short remoteTransactionVersion) {
+                boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
             super(identifier);
             this.transactionPath = transactionPath;
             this.actor = actor;
             this.actorContext = actorContext;
-            this.schemaContext = schemaContext;
             this.isTxActorLocal = isTxActorLocal;
             this.remoteTransactionVersion = remoteTransactionVersion;
+            this.operationCompleter = operationCompleter;
         }
 
+        private Future<Object> completeOperation(Future<Object> operationFuture){
+            operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+            return operationFuture;
+        }
+
+
         private ActorSelection getActor() {
             return actor;
         }
 
         private Future<Object> executeOperationAsync(SerializableMessage msg) {
-            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
+            return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
         }
 
         private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
-            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
-                msg.toSerializable(remoteTransactionVersion));
+            return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+                    msg.toSerializable(remoteTransactionVersion)));
         }
 
         @Override
@@ -1057,10 +1102,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
         private final Throwable failure;
+        private final Semaphore operationLimiter;
 
-        public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
+        public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter){
             super(identifier);
             this.failure = failure;
+            this.operationLimiter = operationLimiter;
         }
 
         @Override
@@ -1071,28 +1118,33 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         @Override
         public Future<ActorSelection> readyTransaction() {
             LOG.debug("Tx {} readyTransaction called", identifier);
+            operationLimiter.release();
             return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 YangInstanceIdentifier path) {
             LOG.debug("Tx {} readData called path = {}", identifier, path);
+            operationLimiter.release();
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
@@ -1101,8 +1153,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 YangInstanceIdentifier path) {
             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            operationLimiter.release();
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }
     }
+
+    private static class OperationCompleter extends OnComplete<Object> {
+        private final Semaphore operationLimiter;
+        OperationCompleter(Semaphore operationLimiter){
+            this.operationLimiter = operationLimiter;
+        }
+
+        @Override
+        public void onComplete(Throwable throwable, Object o){
+            this.operationLimiter.release();
+        }
+    }
 }
index f217d05..c9fdf38 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -21,6 +22,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -45,8 +47,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import static akka.pattern.Patterns.ask;
-
 /**
  * The ActorContext class contains utility methods which could be used by
  * non-actors (like DistributedDataStore) to work with actors a little more
@@ -84,6 +84,7 @@ public class ActorContext {
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
     private final String selfAddressHostPort;
+    private final int transactionOutstandingOperationLimit;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -110,6 +111,8 @@ public class ActorContext {
         } else {
             selfAddressHostPort = null;
         }
+
+        transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -431,4 +434,16 @@ public class ActorContext {
 
         return builder.toString();
     }
+
+    /**
+     * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+     * should begin throttling the operations
+     *
+     * Parking reading this configuration here because we need to get to the actor system settings
+     *
+     * @return
+     */
+    public int getTransactionOutstandingOperationLimit(){
+        return transactionOutstandingOperationLimit;
+    }
 }
index ce0547c..dd37371 100644 (file)
@@ -11,7 +11,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -19,19 +18,21 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class TransactionChainProxyTest {
-    ActorContext actorContext = mock(ActorContext.class);
+public class TransactionChainProxyTest extends AbstractActorTest{
+    ActorContext actorContext = null;
     SchemaContext schemaContext = mock(SchemaContext.class);
 
     @Before
     public void setUp() {
-        doReturn(schemaContext).when(actorContext).getSchemaContext();
+        actorContext = new MockActorContext(getSystem());
+        actorContext.setSchemaContext(schemaContext);
     }
 
     @SuppressWarnings("resource")
index 5e53b29..79edd19 100644 (file)
@@ -10,6 +10,7 @@ import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -30,6 +31,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -59,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -118,7 +121,7 @@ public class TransactionProxyTest {
 
         schemaContext = TestModel.createTestContext();
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
@@ -126,6 +129,7 @@ public class TransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -358,6 +362,10 @@ public class TransactionProxyTest {
         return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
+    private Future<Object> incompleteFuture(){
+        return mock(Future.class);
+    }
+
     private Future<MergeDataReply> mergeDataReply() {
         return Futures.successful(new MergeDataReply());
     }
@@ -395,6 +403,10 @@ public class TransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
+        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
         return actorRef;
     }
 
@@ -1222,4 +1234,425 @@ public class TransactionProxyTest {
 
         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
     }
+
+    private static interface TransactionProxyOperation {
+        void run(TransactionProxy transactionProxy);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation) {
+        throttleOperation(operation, 1, true);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.currentTimeMillis();
+
+        operation.run(transactionProxy);
+
+        long end = System.currentTimeMillis();
+
+        Assert.assertTrue(String.format("took less time than expected %s was %s",
+                mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+                (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+
+    }
+
+    private void completeOperation(TransactionProxyOperation operation){
+        completeOperation(operation, true);
+    }
+
+    private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.currentTimeMillis();
+
+        operation.run(transactionProxy);
+
+        long end = System.currentTimeMillis();
+
+        Assert.assertTrue(String.format("took more time than expected %s was %s",
+                mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+                (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+    }
+
+    public void testWriteThrottling(boolean shardFound){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, 1, shardFound);
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardFound(){
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardNotFound(){
+        // Confirm that there is no throttling when the Shard is not found
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, false);
+
+    }
+
+
+    @Test
+    public void testWriteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testMergeCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+
+    }
+
+    @Test
+    public void testDeleteThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+
+    @Test
+    public void testDeleteThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testDeleteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testReadCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testExistsCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadyThrottling(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.ready();
+            }
+        });
+    }
+
+    @Test
+    public void testReadyThrottlingWithTwoTransactionContexts(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+                NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
+
+                doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(carsNode));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+
+                transactionProxy.ready();
+            }
+        }, 2, true);
+    }
 }
index fcb0324..e4ab969 100644 (file)
@@ -1,8 +1,10 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
@@ -21,10 +23,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 public class ActorContextTest extends AbstractActorTest{
 
     private static class MockShardManager extends UntypedActor {
@@ -224,7 +222,7 @@ public class ActorContextTest extends AbstractActorTest{
     @Test
     public void testResolvePathForRemoteActor() {
         ActorContext actorContext =
-                new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(
                         ClusterWrapper.class),
                         mock(Configuration.class));
 
index e571e3a..67fa096 100644 (file)
@@ -20,6 +20,7 @@ public class TestModel {
 
   public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
           "test");
+
   public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
   public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
   public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");