Fix intermittent unit test failures
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
index b013515f2595950cba669ac08dfdb1a8eefe37fa..3014f7e15785fac4f3ff06f39319729bb660070c 100644 (file)
@@ -1,44 +1,54 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import akka.actor.ActorPath;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.Dispatchers;
 import akka.dispatch.Futures;
-import akka.util.Timeout;
+import akka.testkit.TestActorRef;
 import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
-import com.google.common.collect.Lists;
+import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.mockito.stubbing.Stubber;
+import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import scala.concurrent.Future;
+import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
@@ -46,12 +56,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     static class TestException extends RuntimeException {
     }
 
-    @Mock
     private ActorContext actorContext;
 
-    @Mock
-    private DatastoreContext datastoreContext;
-
     @Mock
     private Timer commitTimer;
 
@@ -61,291 +67,381 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Mock
     private Snapshot commitSnapshot;
 
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+    private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
-        doReturn(getSystem()).when(actorContext).getActorSystem();
-        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
-        doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
-        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
-        doReturn(commitTimerContext).when(commitTimer).time();
-        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
-        doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile();
-        doReturn(10.0).when(actorContext).getTxCreationLimit();
-    }
+        actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
+                new MockClusterWrapper(), new MockConfiguration(),
+                DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) {
+                    @Override
+                    public Timer getOperationTimer(String operationName) {
+                        return commitTimer;
+                    }
 
-    private Future<ActorSelection> newCohort() {
-        ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
-        ActorSelection actorSelection = getSystem().actorSelection(path);
-        return Futures.successful(actorSelection);
-    }
+                    @Override
+                    public double getTxCreationLimit() {
+                        return 10.0;
+                    }
+                };
 
-    private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-        for(int i = 1; i <= nCohorts; i++) {
-            cohortFutures.add(newCohort());
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
         }
-
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
-    }
-
-    private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
-            throws Exception {
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-        cohortFutures.add(newCohort());
-        cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
-
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
     }
 
-    private void setupMockActorContext(Class<?> requestType, Object... responses) {
-        Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
-                .failed((Throwable) responses[0]) : Futures
-                .successful(((SerializableMessage) responses[0]).toSerializable()));
-
-        for(int i = 1; i < responses.length; i++) {
-            stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
-                    .failed((Throwable) responses[i]) : Futures
-                    .successful(((SerializableMessage) responses[i]).toSerializable()));
-        }
-
-        stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
-                isA(requestType), any(Timeout.class));
-    }
+    @Test
+    public void testCanCommitYesWithOneCohort() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
 
-    private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
-        verify(actorContext, times(nCohorts)).executeOperationAsync(
-                any(ActorSelection.class), isA(requestType), any(Timeout.class));
-    }
-
-    private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
-
-        try {
-            future.get(5, TimeUnit.SECONDS);
-            fail("Expected ExecutionException");
-        } catch(ExecutionException e) {
-            throw e.getCause();
-        }
+        verifyCanCommit(proxy.canCommit(), true);
+        verifyCohortActors();
     }
 
     @Test
-    public void testCanCommitWithOneCohort() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.YES);
-
-        ListenableFuture<Boolean> future = proxy.canCommit();
-
-        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
-
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.NO);
+    public void testCanCommitNoWithOneCohort() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
 
-        future = proxy.canCommit();
-
-        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
-
-        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCanCommit(proxy.canCommit(), false);
+        verifyCohortActors();
     }
 
     @Test
-    public void testCanCommitWithMultipleCohorts() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
-
-        ListenableFuture<Boolean> future = proxy.canCommit();
-
-        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
-
-        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+    public void testCanCommitYesWithTwoCohorts() throws Exception {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION))));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifyCohortActors();
     }
 
     @Test
-    public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(3);
-
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
-
-        ListenableFuture<Boolean> future = proxy.canCommit();
-
-        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
-
-        verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+    public void testCanCommitNoWithThreeCohorts() throws Exception {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.no(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1")));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), false);
+        verifyCohortActors();
     }
 
     @Test(expected = TestException.class)
     public void testCanCommitWithExceptionFailure() throws Throwable {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
 
         propagateExecutionExceptionCause(proxy.canCommit());
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCanCommitWithInvalidResponseType() throws Exception {
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanCommitWithInvalidResponseType() throws Throwable {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
 
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new PreCommitTransactionReply());
-
-        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        propagateExecutionExceptionCause(proxy.canCommit());
     }
 
     @Test(expected = TestException.class)
-    public void testCanCommitWithFailedCohortPath() throws Throwable {
+    public void testCanCommitWithFailedCohortFuture() throws Throwable {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1")),
+                newCohortInfoWithFailedFuture(new TestException()),
+                newCohortInfo(new CohortActor.Builder("txn-1")));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
 
-        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
-
-        try {
-            propagateExecutionExceptionCause(proxy.canCommit());
-        } finally {
-            verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
-        }
+        propagateExecutionExceptionCause(proxy.canCommit());
     }
 
     @Test
-    public void testPreCommit() throws Exception {
-        // Precommit is currently a no-op
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+    public void testAllThreePhasesSuccessful() throws Exception {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        verifySuccessfulFuture(proxy.commit());
+        verifyCohortActors();
+    }
+
+    @Test(expected = TestException.class)
+    public void testCommitWithExceptionFailure() throws Throwable {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit(new TestException())));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        propagateExecutionExceptionCause(proxy.commit());
+    }
 
-        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
-                new PreCommitTransactionReply());
+    @Test(expected = IllegalArgumentException.class)
+    public void testCommitWithInvalidResponseType() throws Throwable {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit("invalid"))), "txn-1");
 
-        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        propagateExecutionExceptionCause(proxy.commit());
     }
 
     @Test
     public void testAbort() throws Exception {
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
-        setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
+                        AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
 
-        proxy.abort().get(5, TimeUnit.SECONDS);
-
-        verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+        verifySuccessfulFuture(proxy.abort());
+        verifyCohortActors();
     }
 
     @Test
     public void testAbortWithFailure() throws Exception {
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
-        setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
 
         // The exception should not get propagated.
-        proxy.abort().get(5, TimeUnit.SECONDS);
-
-        verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+        verifySuccessfulFuture(proxy.abort());
+        verifyCohortActors();
     }
 
     @Test
-    public void testAbortWithFailedCohortPath() throws Throwable {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
-
-        // The exception should not get propagated.
-        proxy.abort().get(5, TimeUnit.SECONDS);
-
-        verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+    public void testAbortWithFailedCohortFuture() throws Throwable {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfoWithFailedFuture(new TestException()),
+                newCohortInfo(new CohortActor.Builder("txn-1")));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifySuccessfulFuture(proxy.abort());
+        verifyCohortActors();
     }
 
     @Test
-    public void testCommit() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
-                new CommitTransactionReply());
-
-        proxy.commit().get(5, TimeUnit.SECONDS);
-
-        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+    public void testWithNoCohorts() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
+                Collections.<CohortInfo>emptyList(), "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        verifySuccessfulFuture(proxy.commit());
+        verifyCohortActors();
     }
 
-    @Test(expected = TestException.class)
-    public void testCommitWithFailure() throws Throwable {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
-                new TestException());
-
-        propagateExecutionExceptionCause(proxy.commit());
+    @Test
+    public void testBackwardsCompatibilityWithPreBoron() throws Exception {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class,
+                                CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)).
+                        expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class,
+                                CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)),
+                        DataStoreVersions.LITHIUM_VERSION));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        verifySuccessfulFuture(proxy.commit());
+        verifyCohortActors();
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCommitWithInvalidResponseType() throws Exception {
+    private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
 
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Expected ExecutionException");
+        } catch(ExecutionException e) {
+            verifyCohortActors();
+            throw e.getCause();
+        }
+    }
 
-        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
+    private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
+        TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props().
+                withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
+        cohortActors.add(actor);
+        return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier<Short>() {
+            @Override
+            public Short get() {
+                return version;
+            }
+        });
+    }
 
-        proxy.commit().get(5, TimeUnit.SECONDS);
+    private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
+        return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
+            @Override
+            public Short get() {
+                return CURRENT_VERSION;
+            }
+        });
     }
 
-    @Test(expected = TestException.class)
-    public void testCommitWithFailedCohortPath() throws Throwable {
+    private CohortInfo newCohortInfo(CohortActor.Builder builder) {
+        return newCohortInfo(builder, CURRENT_VERSION);
+    }
 
-        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+    private void verifyCohortActors() {
+        for(TestActorRef<CohortActor> actor: cohortActors) {
+            actor.underlyingActor().verify();
+        }
+    }
 
+    private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
         try {
-            propagateExecutionExceptionCause(proxy.commit());
-        } finally {
-
-            verify(actorContext, never()).setTxCreationLimit(anyLong());
-            verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+            return future.get(5, TimeUnit.SECONDS);
+        } catch(Exception e) {
+            verifyCohortActors();
+            throw e;
         }
-
     }
 
-    @Test
-    public void testAllThreePhasesSuccessful() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
-
-        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
-                new PreCommitTransactionReply(), new PreCommitTransactionReply());
-
-        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
-                new CommitTransactionReply(), new CommitTransactionReply());
+    private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
+        Boolean actual = verifySuccessfulFuture(future);
+        assertEquals("canCommit", expected, actual);
+    }
 
-        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+    private static class CohortActor extends UntypedActor {
+        private final Builder builder;
+        private final AtomicInteger canCommitCount = new AtomicInteger();
+        private final AtomicInteger commitCount = new AtomicInteger();
+        private final AtomicInteger abortCount = new AtomicInteger();
+        private volatile AssertionError assertionError;
 
-        proxy.canCommit().get(5, TimeUnit.SECONDS);
-        proxy.preCommit().get(5, TimeUnit.SECONDS);
-        proxy.commit().get(5, TimeUnit.SECONDS);
+        private CohortActor(Builder builder) {
+            this.builder = builder;
+        }
 
-        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
-        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+        @Override
+        public void onReceive(Object message) {
+            if(CanCommitTransaction.isSerializedType(message)) {
+                canCommitCount.incrementAndGet();
+                onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
+                        builder.expCanCommitType, builder.canCommitReply);
+            } else if(CommitTransaction.isSerializedType(message)) {
+                commitCount.incrementAndGet();
+                onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
+                        builder.expCommitType, builder.commitReply);
+            } else if(AbortTransaction.isSerializedType(message)) {
+                abortCount.incrementAndGet();
+                onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
+                        builder.expAbortType, builder.abortReply);
+            } else {
+                assertionError = new AssertionError("Unexpected message " + message);
+            }
+        }
 
-        // Verify that the creation limit was changed to 0.5 (based on setup)
-        verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
-    }
+        private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
+                Class<?> expType, Object reply) {
+            try {
+                assertNotNull("Unexpected " + name, expType);
+                assertEquals(name + " type", expType, rawMessage.getClass());
+                assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID());
+
+                if(reply instanceof Throwable) {
+                    getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
+                } else {
+                    getSender().tell(reply, self());
+                }
+            } catch(AssertionError e) {
+                assertionError = e;
+            }
+        }
 
-    @Test
-    public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+        void verify() {
+            if(assertionError != null) {
+                throw assertionError;
+            }
 
-        ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+            if(builder.expCanCommitType != null) {
+                assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
+            }
 
-        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+            if(builder.expCommitType != null) {
+                assertEquals("CommitTransaction count", 1, commitCount.get());
+            }
 
-        proxy.canCommit().get(5, TimeUnit.SECONDS);
-        proxy.preCommit().get(5, TimeUnit.SECONDS);
-        proxy.commit().get(5, TimeUnit.SECONDS);
+            if(builder.expAbortType != null) {
+                assertEquals("AbortTransaction count", 1, abortCount.get());
+            }
+        }
 
-        verify(actorContext, never()).setTxCreationLimit(anyLong());
+        static class Builder {
+            private Class<?> expCanCommitType;
+            private Class<?> expCommitType;
+            private Class<?> expAbortType;
+            private Object canCommitReply;
+            private Object commitReply;
+            private Object abortReply;
+            private final String transactionId;
+
+            Builder(String transactionId) {
+                this.transactionId = transactionId;
+            }
+
+            Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
+                this.expCanCommitType = expCanCommitType;
+                this.canCommitReply = canCommitReply;
+                return this;
+            }
+
+            Builder expectCanCommit(Object canCommitReply) {
+                return expectCanCommit(CanCommitTransaction.class, canCommitReply);
+            }
+
+            Builder expectCommit(Class<?> expCommitType, Object commitReply) {
+                this.expCommitType = expCommitType;
+                this.commitReply = commitReply;
+                return this;
+            }
+
+            Builder expectCommit(Object commitReply) {
+                return expectCommit(CommitTransaction.class, commitReply);
+            }
+
+            Builder expectAbort(Class<?> expAbortType, Object abortReply) {
+                this.expAbortType = expAbortType;
+                this.abortReply = abortReply;
+                return this;
+            }
+
+            Builder expectAbort(Object abortReply) {
+                return expectAbort(AbortTransaction.class, abortReply);
+            }
+
+            Props props() {
+                return Props.create(CohortActor.class, this);
+            }
+        }
     }
 }