BUG 2676 : Apply backpressure when creating transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
index 75c93dd5d2fd2de9521d7540da7169fe656c71e9..d2396e0524f340844ea5f7c0e35dfd6cad0b4806 100644 (file)
@@ -3,14 +3,20 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
@@ -43,11 +49,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Mock
     private ActorContext actorContext;
 
+    @Mock
+    private DatastoreContext datastoreContext;
+
+    @Mock
+    private Timer commitTimer;
+
+    @Mock
+    private Timer.Context commitTimerContext;
+
+    @Mock
+    private Snapshot commitSnapshot;
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
         doReturn(getSystem()).when(actorContext).getActorSystem();
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+        doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+        doReturn(10.0).when(actorContext).getTxCreationLimit();
     }
 
     private Future<ActorSelection> newCohort() {
@@ -86,12 +111,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
-                isA(requestType));
+                isA(requestType), any(Timeout.class));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeOperationAsync(
-                any(ActorSelection.class), isA(requestType));
+                any(ActorSelection.class), isA(requestType), any(Timeout.class));
     }
 
     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
@@ -276,8 +301,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         try {
             propagateExecutionExceptionCause(proxy.commit());
         } finally {
+
+            verify(actorContext, never()).setTxCreationLimit(anyLong());
             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
         }
+
     }
 
     @Test
@@ -294,11 +322,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
                 new CommitTransactionReply(), new CommitTransactionReply());
 
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
         proxy.canCommit().get(5, TimeUnit.SECONDS);
         proxy.preCommit().get(5, TimeUnit.SECONDS);
         proxy.commit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+        // Verify that the creation limit was changed to 0.5 (based on setup)
+        verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+    }
+
+    @Test
+    public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verify(actorContext, never()).setTxCreationLimit(anyLong());
     }
 }