X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxyTest.java;h=5a87be3d32837714289fc26b6967ce527621ca86;hp=7c10148d438535234d345f2274a6d43f3ac8ecf2;hb=24c074a4b32ac97980a652b78824b7c2f97ffb78;hpb=c3ea6ff9c64bcd51e46767a18a1370a2b3819dca diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 7c10148d43..5a87be3d32 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -9,42 +9,45 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import akka.actor.ActorPath; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorSelection; import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.dispatch.Dispatchers; import akka.dispatch.Futures; -import akka.util.Timeout; +import akka.testkit.TestActorRef; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; -import com.google.common.collect.Lists; +import com.google.common.base.Supplier; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.stubbing.Stubber; +import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @@ -52,12 +55,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { static class TestException extends RuntimeException { } - @Mock private ActorContext actorContext; - @Mock - private DatastoreContext datastoreContext; - @Mock private Timer commitTimer; @@ -67,15 +66,27 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @Mock private Snapshot commitSnapshot; + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + private final List> cohortActors = new ArrayList<>(); + @Before public void setUp() { MockitoAnnotations.initMocks(this); - doReturn(getSystem()).when(actorContext).getActorSystem(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); - doReturn(datastoreContext).when(actorContext).getDatastoreContext(); - doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); - doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); + actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)), + new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) { + @Override + public Timer getOperationTimer(String operationName) { + return commitTimer; + } + + @Override + public double getTxCreationLimit() { + return 10.0; + } + }; + doReturn(commitTimerContext).when(commitTimer).time(); doReturn(commitSnapshot).when(commitTimer).getSnapshot(); for(int i=1;i<11;i++){ @@ -83,281 +94,336 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); } - doReturn(10.0).when(actorContext).getTxCreationLimit(); - } - - private Future newCohort() { - ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path(); - ActorSelection actorSelection = getSystem().actorSelection(path); - return Futures.successful(actorSelection); - } - - private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception { - List> cohortFutures = Lists.newArrayList(); - for(int i = 1; i <= nCohorts; i++) { - cohortFutures.add(newCohort()); - } - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1"); - } - - private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath() - throws Exception { - List> cohortFutures = Lists.newArrayList(); - cohortFutures.add(newCohort()); - cohortFutures.add(Futures.failed(new TestException())); - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1"); - } - - private void setupMockActorContext(Class requestType, Object... responses) { - Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures - .failed((Throwable) responses[0]) : Futures - .successful(((SerializableMessage) responses[0]).toSerializable())); - - for(int i = 1; i < responses.length; i++) { - stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures - .failed((Throwable) responses[i]) : Futures - .successful(((SerializableMessage) responses[i]).toSerializable())); - } - - stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), - isA(requestType), any(Timeout.class)); - - doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS))) - .when(actorContext).getTransactionCommitOperationTimeout(); } - private void verifyCohortInvocations(int nCohorts, Class requestType) { - verify(actorContext, times(nCohorts)).executeOperationAsync( - any(ActorSelection.class), isA(requestType), any(Timeout.class)); - } - - private void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { + @Test + public void testCanCommitYesWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1"); - try { - future.get(5, TimeUnit.SECONDS); - fail("Expected ExecutionException"); - } catch(ExecutionException e) { - throw e.getCause(); - } + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } @Test - public void testCanCommitWithOneCohort() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.YES); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); + public void testCanCommitNoWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1"); - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.NO); - - future = proxy.canCommit(); - - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test - public void testCanCommitWithMultipleCohorts() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.YES, CanCommitTransactionReply.YES); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + public void testCanCommitYesWithTwoCohorts() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION)))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } @Test - public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(3); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES); - - ListenableFuture future = proxy.canCommit(); - - Boolean actual = future.get(5, TimeUnit.SECONDS); - - assertEquals("canCommit", false, actual); - - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + public void testCanCommitNoWithThreeCohorts() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.no(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1"))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test(expected = TestException.class) public void testCanCommitWithExceptionFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException()); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1"); propagateExecutionExceptionCause(proxy.canCommit()); } - @Test(expected = ExecutionException.class) - public void testCanCommitWithInvalidResponseType() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply()); + @Test(expected = IllegalArgumentException.class) + public void testCanCommitWithInvalidResponseType() throws Throwable { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1"); - proxy.canCommit().get(5, TimeUnit.SECONDS); + propagateExecutionExceptionCause(proxy.canCommit()); } @Test(expected = TestException.class) - public void testCanCommitWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); + public void testCanCommitWithFailedCohortFuture() throws Throwable { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1")), + newCohortInfoWithFailedFuture(new TestException()), + newCohortInfo(new CohortActor.Builder("txn-1"))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); - try { - propagateExecutionExceptionCause(proxy.canCommit()); - } finally { - verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS); - } + propagateExecutionExceptionCause(proxy.canCommit()); } @Test - public void testPreCommit() throws Exception { - // Precommit is currently a no-op - ThreePhaseCommitCohortProxy proxy = setupProxy(1); + public void testAllThreePhasesSuccessful() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); + } + + @Test(expected = TestException.class) + public void testCommitWithExceptionFailure() throws Throwable { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit(new TestException()))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit()); + } - setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply()); + @Test(expected = IllegalArgumentException.class) + public void testCommitWithInvalidResponseType() throws Throwable { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit("invalid"))), "txn-1"); - proxy.preCommit().get(5, TimeUnit.SECONDS); + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit()); } @Test public void testAbort() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply()); - - proxy.abort().get(5, TimeUnit.SECONDS); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectAbort( + AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1"); - verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test public void testAbortWithFailure() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock")); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1"); // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testAbortWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); - - // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS); + public void testAbortWithFailedCohortFuture() throws Throwable { + List cohorts = Arrays.asList( + newCohortInfoWithFailedFuture(new TestException()), + newCohortInfo(new CohortActor.Builder("txn-1"))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testCommit() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), - new CommitTransactionReply()); - - proxy.commit().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + public void testWithNoCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, + Collections.emptyList(), "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = TestException.class) - public void testCommitWithFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), - new TestException()); + private void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { - propagateExecutionExceptionCause(proxy.commit()); + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch(ExecutionException e) { + verifyCohortActors(); + throw e.getCause(); + } } - @Test(expected = ExecutionException.class) - public void testCommitWithInvalidResponseType() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply()); + private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) { + TestActorRef actor = actorFactory.createTestActor(builder.props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort")); + cohortActors.add(actor); + return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier() { + @Override + public Short get() { + return version; + } + }); + } - proxy.commit().get(5, TimeUnit.SECONDS); + private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) { + return new CohortInfo(Futures.failed(failure), new Supplier() { + @Override + public Short get() { + return CURRENT_VERSION; + } + }); } - @Test(expected = TestException.class) - public void testCommitWithFailedCohortPath() throws Throwable { + private CohortInfo newCohortInfo(CohortActor.Builder builder) { + return newCohortInfo(builder, CURRENT_VERSION); + } - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); + private void verifyCohortActors() { + for(TestActorRef actor: cohortActors) { + actor.underlyingActor().verify(); + } + } + private T verifySuccessfulFuture(ListenableFuture future) throws Exception { try { - propagateExecutionExceptionCause(proxy.commit()); - } finally { - - verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS); + return future.get(5, TimeUnit.SECONDS); + } catch(Exception e) { + verifyCohortActors(); + throw e; } - } - @Test - public void testAllThreePhasesSuccessful() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.YES, CanCommitTransactionReply.YES); - - setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply(), new PreCommitTransactionReply()); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, - new CommitTransactionReply(), new CommitTransactionReply()); + private void verifyCanCommit(ListenableFuture future, boolean expected) throws Exception { + Boolean actual = verifySuccessfulFuture(future); + assertEquals("canCommit", expected, actual); + } - assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + private static class CohortActor extends UntypedActor { + private final Builder builder; + private final AtomicInteger canCommitCount = new AtomicInteger(); + private final AtomicInteger commitCount = new AtomicInteger(); + private final AtomicInteger abortCount = new AtomicInteger(); + private volatile AssertionError assertionError; - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + private CohortActor(Builder builder) { + this.builder = builder; + } - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); - verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + @Override + public void onReceive(Object message) { + if(CanCommitTransaction.isSerializedType(message)) { + canCommitCount.incrementAndGet(); + onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message), + builder.expCanCommitType, builder.canCommitReply); + } else if(CommitTransaction.isSerializedType(message)) { + commitCount.incrementAndGet(); + onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message), + builder.expCommitType, builder.commitReply); + } else if(AbortTransaction.isSerializedType(message)) { + abortCount.incrementAndGet(); + onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message), + builder.expAbortType, builder.abortReply); + } else { + assertionError = new AssertionError("Unexpected message " + message); + } + } - } + private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage, + Class expType, Object reply) { + try { + assertNotNull("Unexpected " + name, expType); + assertEquals(name + " type", expType, rawMessage.getClass()); + assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID()); + + if(reply instanceof Throwable) { + getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self()); + } else { + getSender().tell(reply, self()); + } + } catch(AssertionError e) { + assertionError = e; + } + } - @Test - public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception { + void verify() { + if(assertionError != null) { + throw assertionError; + } - ThreePhaseCommitCohortProxy proxy = setupProxy(0); + if(builder.expCanCommitType != null) { + assertEquals("CanCommitTransaction count", 1, canCommitCount.get()); + } - assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + if(builder.expCommitType != null) { + assertEquals("CommitTransaction count", 1, commitCount.get()); + } - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + if(builder.expAbortType != null) { + assertEquals("AbortTransaction count", 1, abortCount.get()); + } + } + static class Builder { + private Class expCanCommitType; + private Class expCommitType; + private Class expAbortType; + private Object canCommitReply; + private Object commitReply; + private Object abortReply; + private final String transactionId; + + Builder(String transactionId) { + this.transactionId = transactionId; + } + + Builder expectCanCommit(Class expCanCommitType, Object canCommitReply) { + this.expCanCommitType = expCanCommitType; + this.canCommitReply = canCommitReply; + return this; + } + + Builder expectCanCommit(Object canCommitReply) { + return expectCanCommit(CanCommitTransaction.class, canCommitReply); + } + + Builder expectCommit(Class expCommitType, Object commitReply) { + this.expCommitType = expCommitType; + this.commitReply = commitReply; + return this; + } + + Builder expectCommit(Object commitReply) { + return expectCommit(CommitTransaction.class, commitReply); + } + + Builder expectAbort(Class expAbortType, Object abortReply) { + this.expAbortType = expAbortType; + this.abortReply = abortReply; + return this; + } + + Builder expectAbort(Object abortReply) { + return expectAbort(AbortTransaction.class, abortReply); + } + + Props props() { + return Props.create(CohortActor.class, this); + } + } } }