BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
index 8909e1d3127fe7e49e986470f95d4a04533cf206..4301a72d180273d79e868b0c8de2acf19f916ff9 100644 (file)
@@ -798,20 +798,29 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         throttleOperation(operation, 1, true);
     }
 
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+    }
+
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
-        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
+        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
+                Optional.<DataTree>absent());
     }
 
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
-        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), dataTreeOptional);
+        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
+                dataTreeOptional);
     }
 
 
-    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+        // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
+        // we now allow one extra permit to be allowed for ready
+        doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
+                shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
 
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
@@ -819,6 +828,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         if(shardFound) {
             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+            doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
+                    when(mockActorContext).findPrimaryShardAsync(eq("cars"));
+
         } else {
             doReturn(Futures.failed(new Exception("not found")))
                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
@@ -843,9 +855,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         long end = System.nanoTime();
 
-        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
-                expected, (end-start)), (end - start) > expected);
+                expectedCompletionTime, (end-start)),
+                ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
 
     }
 
@@ -857,8 +869,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
@@ -901,8 +911,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
-        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(actorSystem.actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
@@ -950,7 +958,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -966,7 +973,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteThrottlingWhenShardFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -984,7 +990,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testWriteThrottlingWhenShardNotFound(){
         // Confirm that there is no throttling when the Shard is not found
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1003,7 +1008,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1020,7 +1024,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1037,7 +1040,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardNotFound(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1054,7 +1056,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1072,7 +1073,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1120,7 +1120,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletionForLocalShard(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1135,7 +1134,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletion(){
-        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1324,7 +1322,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testReadyThrottlingWithTwoTransactionContexts(){
-
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1338,11 +1335,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+                // Trying to write to Cars will cause another transaction context to get created
+                transactionProxy.write(CarsModel.BASE_PATH, carsNode);
 
+                // Now ready should block for both transaction contexts
                 transactionProxy.ready();
             }
-        }, 2, true);
+        }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2);
     }
 
     private void testModificationOperationBatching(TransactionType type) throws Exception {
@@ -1524,8 +1523,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
 
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);