BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
index 26d51cbae3bf2df728ec1fa9e3322d0151b114b4..4301a72d180273d79e868b0c8de2acf19f916ff9 100644 (file)
@@ -798,6 +798,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         throttleOperation(operation, 1, true);
     }
 
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+    }
+
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
                 Optional.<DataTree>absent());
@@ -809,11 +813,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
 
-    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+        // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
+        // we now allow one extra permit to be allowed for ready
+        doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
+                shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
 
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
@@ -821,6 +828,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         if(shardFound) {
             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+            doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
+                    when(mockActorContext).findPrimaryShardAsync(eq("cars"));
+
         } else {
             doReturn(Futures.failed(new Exception("not found")))
                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
@@ -845,9 +855,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         long end = System.nanoTime();
 
-        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
-                expected, (end-start)), (end - start) > expected);
+                expectedCompletionTime, (end-start)),
+                ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
 
     }
 
@@ -859,8 +869,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
@@ -903,8 +911,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
@@ -952,7 +958,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -968,7 +973,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteThrottlingWhenShardFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -986,7 +990,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testWriteThrottlingWhenShardNotFound(){
         // Confirm that there is no throttling when the Shard is not found
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1005,7 +1008,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1022,7 +1024,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1039,7 +1040,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardNotFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1056,7 +1056,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1074,7 +1073,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1122,7 +1120,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1137,7 +1134,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1326,7 +1322,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testReadyThrottlingWithTwoTransactionContexts(){
-
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1340,11 +1335,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+                // Trying to write to Cars will cause another transaction context to get created
+                transactionProxy.write(CarsModel.BASE_PATH, carsNode);
 
+                // Now ready should block for both transaction contexts
                 transactionProxy.ready();
             }
-        }, 2, true);
+        }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2);
     }
 
     private void testModificationOperationBatching(TransactionType type) throws Exception {
@@ -1526,8 +1523,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
 
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);