Deprecate 3PC protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
index 75c93dd5d2fd2de9521d7540da7169fe656c71e9..0ab92dda893c99e52fe548079f37e89fa6ba6f1d 100644 (file)
@@ -1,3 +1,11 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
@@ -7,10 +15,14 @@ import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
@@ -27,12 +39,11 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
@@ -43,11 +54,35 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Mock
     private ActorContext actorContext;
 
+    @Mock
+    private DatastoreContext datastoreContext;
+
+    @Mock
+    private Timer commitTimer;
+
+    @Mock
+    private Timer.Context commitTimerContext;
+
+    @Mock
+    private Snapshot commitSnapshot;
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
         doReturn(getSystem()).when(actorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+        doReturn(10.0).when(actorContext).getTxCreationLimit();
     }
 
     private Future<ActorSelection> newCohort() {
@@ -86,15 +121,18 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
-                isA(requestType));
+                isA(requestType), any(Timeout.class));
+
+        doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+                .when(actorContext).getTransactionCommitOperationTimeout();
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeOperationAsync(
-                any(ActorSelection.class), isA(requestType));
+                any(ActorSelection.class), isA(requestType), any(Timeout.class));
     }
 
-    private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+    private static void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
 
         try {
             future.get(5, TimeUnit.SECONDS);
@@ -109,21 +147,19 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.YES);
+        setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.yes(CURRENT_VERSION));
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.NO);
+        setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.no(CURRENT_VERSION));
 
         future = proxy.canCommit();
 
         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
 
-        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, CanCommitTransaction.class);
     }
 
     @Test
@@ -131,14 +167,14 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
+        setupMockActorContext(CanCommitTransaction.class,
+                CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
-        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, CanCommitTransaction.class);
     }
 
     @Test
@@ -146,14 +182,17 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
+        setupMockActorContext(CanCommitTransaction.class,
+                CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.no(CURRENT_VERSION),
+                CanCommitTransactionReply.yes(CURRENT_VERSION));
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+        Boolean actual = future.get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+        assertEquals("canCommit", false, actual);
+
+        verifyCohortInvocations(2, CanCommitTransaction.class);
     }
 
     @Test(expected = TestException.class)
@@ -161,7 +200,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
+        setupMockActorContext(CanCommitTransaction.class, new TestException());
 
         propagateExecutionExceptionCause(proxy.canCommit());
     }
@@ -171,8 +210,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new PreCommitTransactionReply());
+        setupMockActorContext(CanCommitTransaction.class,
+                new CommitTransactionReply());
 
         proxy.canCommit().get(5, TimeUnit.SECONDS);
     }
@@ -185,7 +224,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         try {
             propagateExecutionExceptionCause(proxy.canCommit());
         } finally {
-            verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+            verifyCohortInvocations(0, CanCommitTransaction.class);
         }
     }
 
@@ -194,9 +233,6 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         // Precommit is currently a no-op
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
-                new PreCommitTransactionReply());
-
         proxy.preCommit().get(5, TimeUnit.SECONDS);
     }
 
@@ -204,23 +240,23 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     public void testAbort() throws Exception {
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
+        setupMockActorContext(AbortTransaction.class, new AbortTransactionReply());
 
         proxy.abort().get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(1, AbortTransaction.class);
     }
 
     @Test
     public void testAbortWithFailure() throws Exception {
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+        setupMockActorContext(AbortTransaction.class, new RuntimeException("mock"));
 
         // The exception should not get propagated.
         proxy.abort().get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(1, AbortTransaction.class);
     }
 
     @Test
@@ -231,7 +267,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         // The exception should not get propagated.
         proxy.abort().get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(0, AbortTransaction.class);
     }
 
     @Test
@@ -239,12 +275,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
-        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+        setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
                 new CommitTransactionReply());
 
         proxy.commit().get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, CommitTransaction.class);
     }
 
     @Test(expected = TestException.class)
@@ -252,7 +288,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
-        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+        setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
                 new TestException());
 
         propagateExecutionExceptionCause(proxy.commit());
@@ -263,7 +299,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
+        setupMockActorContext(CommitTransaction.class, new AbortTransactionReply());
 
         proxy.commit().get(5, TimeUnit.SECONDS);
     }
@@ -276,8 +312,10 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         try {
             propagateExecutionExceptionCause(proxy.commit());
         } finally {
-            verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+
+            verifyCohortInvocations(0, CommitTransaction.class);
         }
+
     }
 
     @Test
@@ -285,20 +323,33 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
+        setupMockActorContext(CanCommitTransaction.class,
+                CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
 
-        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
-                new PreCommitTransactionReply(), new PreCommitTransactionReply());
-
-        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+        setupMockActorContext(CommitTransaction.class,
                 new CommitTransactionReply(), new CommitTransactionReply());
 
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(2, CanCommitTransaction.class);
+        verifyCohortInvocations(2, CommitTransaction.class);
+
+    }
+
+    @Test
+    public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
         proxy.canCommit().get(5, TimeUnit.SECONDS);
         proxy.preCommit().get(5, TimeUnit.SECONDS);
         proxy.commit().get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
-        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
     }
 }