BUG 2762 : Compute rate limit based on a more lenient algorithm
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
index 4eca5671f626543b5c9ca8052f1427945c72a302..647b6e7b542508953bb9750555aaf193c0ba2864 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
 import akka.actor.Props;
-
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import junit.framework.Assert;
-
-import org.junit.After;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Stubber;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import scala.concurrent.Future;
 
-import java.util.Arrays;
-import java.util.concurrent.Executors;
+public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
-import static org.junit.Assert.assertNotNull;
+    @SuppressWarnings("serial")
+    static class TestException extends RuntimeException {
+    }
 
-public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
+    @Mock
+    private ActorContext actorContext;
+
+    @Mock
+    private DatastoreContext datastoreContext;
 
-    private ThreePhaseCommitCohortProxy proxy;
-    private Props props;
-    private ActorRef actorRef;
-    private MockActorContext actorContext;
-    private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
-                        Executors.newSingleThreadExecutor());
+    @Mock
+    private Timer commitTimer;
+
+    @Mock
+    private Timer.Context commitTimerContext;
+
+    @Mock
+    private Snapshot commitSnapshot;
 
     @Before
-    public void setUp(){
-        props = Props.create(MessageCollectorActor.class);
-        actorRef = getSystem().actorOf(props);
-        actorContext = new MockActorContext(this.getSystem());
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
 
-        proxy =
-            new ThreePhaseCommitCohortProxy(actorContext,
-                Arrays.asList(actorRef.path()), "txn-1", executor);
+        doReturn(getSystem()).when(actorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+        doReturn(10.0).when(actorContext).getTxCreationLimit();
+    }
 
+    private Future<ActorSelection> newCohort() {
+        ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
+        ActorSelection actorSelection = getSystem().actorSelection(path);
+        return Futures.successful(actorSelection);
     }
 
-    @After
-    public void tearDown() {
-        executor.shutdownNow();
+    private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+        for(int i = 1; i <= nCohorts; i++) {
+            cohortFutures.add(newCohort());
+        }
+
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
+    }
+
+    private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+            throws Exception {
+        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+        cohortFutures.add(newCohort());
+        cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
+
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
+    }
+
+    private void setupMockActorContext(Class<?> requestType, Object... responses) {
+        Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
+                .failed((Throwable) responses[0]) : Futures
+                .successful(((SerializableMessage) responses[0]).toSerializable()));
+
+        for(int i = 1; i < responses.length; i++) {
+            stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
+                    .failed((Throwable) responses[i]) : Futures
+                    .successful(((SerializableMessage) responses[i]).toSerializable()));
+        }
+
+        stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
+                isA(requestType), any(Timeout.class));
+    }
+
+    private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
+        verify(actorContext, times(nCohorts)).executeOperationAsync(
+                any(ActorSelection.class), isA(requestType), any(Timeout.class));
+    }
+
+    private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Expected ExecutionException");
+        } catch(ExecutionException e) {
+            throw e.getCause();
+        }
     }
 
     @Test
-    public void testCanCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable());
+    public void testCanCommitWithOneCohort() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        Assert.assertTrue(future.get().booleanValue());
+        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                CanCommitTransactionReply.NO);
+
+        future = proxy.canCommit();
 
+        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
 
     @Test
-    public void testPreCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable());
+    public void testCanCommitWithMultipleCohorts() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
+
+        ListenableFuture<Boolean> future = proxy.canCommit();
+
+        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test
+    public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(3);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
+
+        ListenableFuture<Boolean> future = proxy.canCommit();
+
+        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+
+        verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCanCommitWithExceptionFailure() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
+
+        propagateExecutionExceptionCause(proxy.canCommit());
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void testCanCommitWithInvalidResponseType() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply());
 
-        ListenableFuture<Void> future = proxy.preCommit();
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCanCommitWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        try {
+            propagateExecutionExceptionCause(proxy.canCommit());
+        } finally {
+            verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+        }
+    }
+
+    @Test
+    public void testPreCommit() throws Exception {
+        // Precommit is currently a no-op
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        future.get();
+        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply());
 
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
     }
 
     @Test
     public void testAbort() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable());
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
+
+        proxy.abort().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test
+    public void testAbortWithFailure() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        ListenableFuture<Void> future = proxy.abort();
+        setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
 
-        future.get();
+        // The exception should not get propagated.
+        proxy.abort().get(5, TimeUnit.SECONDS);
 
+        verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test
+    public void testAbortWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        // The exception should not get propagated.
+        proxy.abort().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
     }
 
     @Test
     public void testCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable());
 
-        ListenableFuture<Void> future = proxy.commit();
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+                new CommitTransactionReply());
+
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCommitWithFailure() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+                new TestException());
+
+        propagateExecutionExceptionCause(proxy.commit());
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void testCommitWithInvalidResponseType() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
+
+        proxy.commit().get(5, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCommitWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        try {
+            propagateExecutionExceptionCause(proxy.commit());
+        } finally {
+
+            verify(actorContext, never()).setTxCreationLimit(anyLong());
+            verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+        }
+
+    }
+
+    @Test
+    public void testAllThreePhasesSuccessful() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
+
+        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply(), new PreCommitTransactionReply());
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+                new CommitTransactionReply(), new CommitTransactionReply());
+
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
 
-        future.get();
     }
 
     @Test
-    public void testGetCohortPaths() throws Exception {
-        assertNotNull(proxy.getCohortPaths());
+    public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verify(actorContext, never()).setTxCreationLimit(anyLong());
     }
 }