X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxyTest.java;h=45319712ae87bfd3c30cc9704bddf66d4e25fea7;hb=HEAD;hp=0ab92dda893c99e52fe548079f37e89fa6ba6f1d;hpb=5a4560d475f0ed328275f1a5c7a5dae292acfb02;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 0ab92dda89..e2b3872d86 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -5,351 +5,402 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.lenient; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; -import akka.actor.ActorPath; + import akka.actor.ActorSelection; import akka.actor.Props; +import akka.actor.UntypedAbstractActor; +import akka.dispatch.Dispatchers; import akka.dispatch.Futures; -import akka.util.Timeout; +import akka.testkit.TestActorRef; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.stubbing.Stubber; +import org.mockito.junit.MockitoJUnitRunner; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; +@Deprecated(since = "9.0.0", forRemoval = true) +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { - - @SuppressWarnings("serial") static class TestException extends RuntimeException { - } + private static final long serialVersionUID = 1L; - @Mock - private ActorContext actorContext; + } - @Mock - private DatastoreContext datastoreContext; + private ActorUtils actorUtils; @Mock private Timer commitTimer; - @Mock private Timer.Context commitTimerContext; - @Mock private Snapshot commitSnapshot; + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + private final List> cohortActors = new ArrayList<>(); + private final TransactionIdentifier tx = nextTransactionId(); + @Before public void setUp() { - MockitoAnnotations.initMocks(this); - - doReturn(getSystem()).when(actorContext).getActorSystem(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); - doReturn(datastoreContext).when(actorContext).getDatastoreContext(); - doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); - doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); - doReturn(commitTimerContext).when(commitTimer).time(); - doReturn(commitSnapshot).when(commitTimer).getSnapshot(); - for(int i=1;i<11;i++){ + actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)), + new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), + new PrimaryShardInfoFutureCache()) { + @Override + public Timer getOperationTimer(final String operationName) { + return commitTimer; + } + + @Override + public double getTxCreationLimit() { + return 10.0; + } + }; + + lenient().doReturn(commitTimerContext).when(commitTimer).time(); + lenient().doReturn(commitSnapshot).when(commitTimer).getSnapshot(); + for (int i = 1; i < 11; i++) { // Keep on increasing the amount of time it takes to complete transaction for each tenth of a // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. - doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); - } - doReturn(10.0).when(actorContext).getTxCreationLimit(); - } - - private Future newCohort() { - ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path(); - ActorSelection actorSelection = getSystem().actorSelection(path); - return Futures.successful(actorSelection); - } - - private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception { - List> cohortFutures = Lists.newArrayList(); - for(int i = 1; i <= nCohorts; i++) { - cohortFutures.add(newCohort()); + lenient().doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); } - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1"); } - private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath() - throws Exception { - List> cohortFutures = Lists.newArrayList(); - cohortFutures.add(newCohort()); - cohortFutures.add(Futures.failed(new TestException())); + @Test + public void testCanCommitYesWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))), + tx); - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1"); + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } - private void setupMockActorContext(Class requestType, Object... responses) { - Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures - .failed((Throwable) responses[0]) : Futures - .successful(((SerializableMessage) responses[0]).toSerializable())); - - for(int i = 1; i < responses.length; i++) { - stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures - .failed((Throwable) responses[i]) : Futures - .successful(((SerializableMessage) responses[i]).toSerializable())); - } - - stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), - isA(requestType), any(Timeout.class)); - - doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS))) - .when(actorContext).getTransactionCommitOperationTimeout(); - } + @Test + public void testCanCommitNoWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION)))), + tx); - private void verifyCohortInvocations(int nCohorts, Class requestType) { - verify(actorContext, times(nCohorts)).executeOperationAsync( - any(ActorSelection.class), isA(requestType), any(Timeout.class)); + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } - private static void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { - - try { - future.get(5, TimeUnit.SECONDS); - fail("Expected ExecutionException"); - } catch(ExecutionException e) { - throw e.getCause(); - } + @Test + public void testCanCommitYesWithTwoCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))), + tx); + + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } @Test - public void testCanCommitWithOneCohort() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.yes(CURRENT_VERSION)); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); - - setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.no(CURRENT_VERSION)); - - future = proxy.canCommit(); - - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.class); + public void testCanCommitNoWithThreeCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx))), tx); + + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test - public void testCanCommitWithMultipleCohorts() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); + public void testCanCommitWithExceptionFailure() { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, + List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx); - setupMockActorContext(CanCommitTransaction.class, - CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION)); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.class); + propagateExecutionExceptionCause(proxy.canCommit(), TestException.class); } @Test - public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(3); - - setupMockActorContext(CanCommitTransaction.class, - CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.no(CURRENT_VERSION), - CanCommitTransactionReply.yes(CURRENT_VERSION)); - - ListenableFuture future = proxy.canCommit(); + public void testCanCommitWithInvalidResponseType() { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, + List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx); - Boolean actual = future.get(5, TimeUnit.SECONDS); - - assertEquals("canCommit", false, actual); - - verifyCohortInvocations(2, CanCommitTransaction.class); + assertEquals("Unexpected response type class java.lang.String", + propagateExecutionExceptionCause(proxy.canCommit(), IllegalArgumentException.class)); } - @Test(expected = TestException.class) - public void testCanCommitWithExceptionFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.class, new TestException()); + @Test + public void testCanCommitWithFailedCohortFuture() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx)), + newCohortInfoWithFailedFuture(new TestException()), + newCohortInfo(new CohortActor.Builder(tx))), tx); - propagateExecutionExceptionCause(proxy.canCommit()); + propagateExecutionExceptionCause(proxy.canCommit(), TestException.class); } - @Test(expected = ExecutionException.class) - public void testCanCommitWithInvalidResponseType() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.class, - new CommitTransactionReply()); - - proxy.canCommit().get(5, TimeUnit.SECONDS); + @Test + public void testAllThreePhasesSuccessful() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))), tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = TestException.class) - public void testCanCommitWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); - - try { - propagateExecutionExceptionCause(proxy.canCommit()); - } finally { - verifyCohortInvocations(0, CanCommitTransaction.class); - } + @Test + public void testCommitWithExceptionFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(new TestException()))), tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit(), TestException.class); } @Test - public void testPreCommit() throws Exception { - // Precommit is currently a no-op - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - proxy.preCommit().get(5, TimeUnit.SECONDS); + public void testCommitWithInvalidResponseType() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit("invalid"))), + tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + assertEquals("Unexpected response type class java.lang.String", + propagateExecutionExceptionCause(proxy.commit(), IllegalArgumentException.class)); } @Test public void testAbort() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, + List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort( + AbortTransactionReply.instance(CURRENT_VERSION)))), + tx); - setupMockActorContext(AbortTransaction.class, new AbortTransactionReply()); - - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(1, AbortTransaction.class); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test public void testAbortWithFailure() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.class, new RuntimeException("mock")); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, + List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx); // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(1, AbortTransaction.class); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testAbortWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); - - // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); + public void testAbortWithFailedCohortFuture() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx))), tx); - verifyCohortInvocations(0, AbortTransaction.class); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testCommit() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(), - new CommitTransactionReply()); + public void testWithNoCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(), tx); - proxy.commit().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(2, CommitTransaction.class); + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = TestException.class) - public void testCommitWithFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(), - new TestException()); - - propagateExecutionExceptionCause(proxy.commit()); + private String propagateExecutionExceptionCause(final ListenableFuture future, + final Class expected) { + final var ex = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)).getCause(); + verifyCohortActors(); + assertThat(ex, instanceOf(expected)); + return ex.getMessage(); } - @Test(expected = ExecutionException.class) - public void testCommitWithInvalidResponseType() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CommitTransaction.class, new AbortTransactionReply()); + private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) { + TestActorRef actor = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort")); + cohortActors.add(actor); + return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version); + } - proxy.commit().get(5, TimeUnit.SECONDS); + private CohortInfo newCohortInfo(final CohortActor.Builder builder) { + return newCohortInfo(builder, CURRENT_VERSION); } - @Test(expected = TestException.class) - public void testCommitWithFailedCohortPath() throws Throwable { + private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) { + return new CohortInfo(Futures.failed(failure), () -> CURRENT_VERSION); + } - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); + private void verifyCohortActors() { + for (TestActorRef actor: cohortActors) { + actor.underlyingActor().verify(); + } + } + @SuppressWarnings("checkstyle:IllegalCatch") + private T verifySuccessfulFuture(final ListenableFuture future) throws Exception { try { - propagateExecutionExceptionCause(proxy.commit()); - } finally { - - verifyCohortInvocations(0, CommitTransaction.class); + return future.get(5, TimeUnit.SECONDS); + } catch (Exception e) { + verifyCohortActors(); + throw e; } - } - @Test - public void testAllThreePhasesSuccessful() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CanCommitTransaction.class, - CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION)); - - setupMockActorContext(CommitTransaction.class, - new CommitTransactionReply(), new CommitTransactionReply()); + private void verifyCanCommit(final ListenableFuture future, final boolean expected) throws Exception { + Boolean actual = verifySuccessfulFuture(future); + assertEquals("canCommit", expected, actual); + } - assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + private static class CohortActor extends UntypedAbstractActor { + private final Builder builder; + private final AtomicInteger canCommitCount = new AtomicInteger(); + private final AtomicInteger commitCount = new AtomicInteger(); + private final AtomicInteger abortCount = new AtomicInteger(); + private volatile AssertionError assertionError; - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + CohortActor(final Builder builder) { + this.builder = builder; + } - verifyCohortInvocations(2, CanCommitTransaction.class); - verifyCohortInvocations(2, CommitTransaction.class); + @Override + public void onReceive(final Object message) { + if (CanCommitTransaction.isSerializedType(message)) { + canCommitCount.incrementAndGet(); + onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message), + builder.expCanCommitType, builder.canCommitReply); + } else if (CommitTransaction.isSerializedType(message)) { + commitCount.incrementAndGet(); + onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message), + builder.expCommitType, builder.commitReply); + } else if (AbortTransaction.isSerializedType(message)) { + abortCount.incrementAndGet(); + onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message), + builder.expAbortType, builder.abortReply); + } else { + assertionError = new AssertionError("Unexpected message " + message); + } + } - } + private void onMessage(final String name, final Object rawMessage, + final AbstractThreePhaseCommitMessage actualMessage, final Class expType, final Object reply) { + try { + assertNotNull("Unexpected " + name, expType); + assertEquals(name + " type", expType, rawMessage.getClass()); + assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId()); + + if (reply instanceof Throwable) { + getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self()); + } else { + getSender().tell(reply, self()); + } + } catch (AssertionError e) { + assertionError = e; + } + } - @Test - public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception { + void verify() { + if (assertionError != null) { + throw assertionError; + } - ThreePhaseCommitCohortProxy proxy = setupProxy(0); + if (builder.expCanCommitType != null) { + assertEquals("CanCommitTransaction count", 1, canCommitCount.get()); + } - assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + if (builder.expCommitType != null) { + assertEquals("CommitTransaction count", 1, commitCount.get()); + } - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + if (builder.expAbortType != null) { + assertEquals("AbortTransaction count", 1, abortCount.get()); + } + } + static class Builder { + private Class expCanCommitType; + private Class expCommitType; + private Class expAbortType; + private Object canCommitReply; + private Object commitReply; + private Object abortReply; + private final TransactionIdentifier transactionId; + + Builder(final TransactionIdentifier transactionId) { + this.transactionId = requireNonNull(transactionId); + } + + Builder expectCanCommit(final Class newExpCanCommitType, final Object newCanCommitReply) { + expCanCommitType = newExpCanCommitType; + canCommitReply = newCanCommitReply; + return this; + } + + Builder expectCanCommit(final Object newCanCommitReply) { + return expectCanCommit(CanCommitTransaction.class, newCanCommitReply); + } + + Builder expectCommit(final Class newExpCommitType, final Object newCommitReply) { + expCommitType = newExpCommitType; + commitReply = newCommitReply; + return this; + } + + Builder expectCommit(final Object newCommitReply) { + return expectCommit(CommitTransaction.class, newCommitReply); + } + + Builder expectAbort(final Class newExpAbortType, final Object newAbortReply) { + expAbortType = newExpAbortType; + abortReply = newAbortReply; + return this; + } + + Builder expectAbort(final Object newAbortReply) { + return expectAbort(AbortTransaction.class, newAbortReply); + } + + Props props() { + return Props.create(CohortActor.class, this); + } + } } }