X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxyTest.java;h=45319712ae87bfd3c30cc9704bddf66d4e25fea7;hp=b013515f2595950cba669ac08dfdb1a8eefe37fa;hb=4a114730dfc6b5fd914582020a55601ea288a498;hpb=bead4f9288b0c758ac736ee8945ca01313b177da diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index b013515f25..45319712ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -1,351 +1,405 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import akka.actor.ActorPath; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.lenient; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; + import akka.actor.ActorSelection; import akka.actor.Props; +import akka.actor.UntypedAbstractActor; +import akka.dispatch.Dispatchers; import akka.dispatch.Futures; -import akka.util.Timeout; +import akka.testkit.TestActorRef; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.stubbing.Stubber; +import org.mockito.junit.MockitoJUnitRunner; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import scala.concurrent.Future; - +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { - - @SuppressWarnings("serial") static class TestException extends RuntimeException { - } + private static final long serialVersionUID = 1L; - @Mock - private ActorContext actorContext; + } - @Mock - private DatastoreContext datastoreContext; + private ActorUtils actorUtils; @Mock private Timer commitTimer; - @Mock private Timer.Context commitTimerContext; - @Mock private Snapshot commitSnapshot; + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + private final List> cohortActors = new ArrayList<>(); + private final TransactionIdentifier tx = nextTransactionId(); + @Before public void setUp() { - MockitoAnnotations.initMocks(this); - - doReturn(getSystem()).when(actorContext).getActorSystem(); - doReturn(datastoreContext).when(actorContext).getDatastoreContext(); - doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); - doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); - doReturn(commitTimerContext).when(commitTimer).time(); - doReturn(commitSnapshot).when(commitTimer).getSnapshot(); - doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile(); - doReturn(10.0).when(actorContext).getTxCreationLimit(); - } - - private Future newCohort() { - ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path(); - ActorSelection actorSelection = getSystem().actorSelection(path); - return Futures.successful(actorSelection); - } - - private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception { - List> cohortFutures = Lists.newArrayList(); - for(int i = 1; i <= nCohorts; i++) { - cohortFutures.add(newCohort()); + actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)), + new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), + new PrimaryShardInfoFutureCache()) { + @Override + public Timer getOperationTimer(final String operationName) { + return commitTimer; + } + + @Override + public double getTxCreationLimit() { + return 10.0; + } + }; + + lenient().doReturn(commitTimerContext).when(commitTimer).time(); + lenient().doReturn(commitSnapshot).when(commitTimer).getSnapshot(); + for (int i = 1; i < 11; i++) { + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + lenient().doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); } - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1"); } - private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath() - throws Exception { - List> cohortFutures = Lists.newArrayList(); - cohortFutures.add(newCohort()); - cohortFutures.add(Futures.failed(new TestException())); + @Test + public void testCanCommitYesWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))), + tx); - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1"); + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } - private void setupMockActorContext(Class requestType, Object... responses) { - Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures - .failed((Throwable) responses[0]) : Futures - .successful(((SerializableMessage) responses[0]).toSerializable())); - - for(int i = 1; i < responses.length; i++) { - stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures - .failed((Throwable) responses[i]) : Futures - .successful(((SerializableMessage) responses[i]).toSerializable())); - } - - stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), - isA(requestType), any(Timeout.class)); - } + @Test + public void testCanCommitNoWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION)))), + tx); - private void verifyCohortInvocations(int nCohorts, Class requestType) { - verify(actorContext, times(nCohorts)).executeOperationAsync( - any(ActorSelection.class), isA(requestType), any(Timeout.class)); + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } - private void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { - - try { - future.get(5, TimeUnit.SECONDS); - fail("Expected ExecutionException"); - } catch(ExecutionException e) { - throw e.getCause(); - } + @Test + public void testCanCommitYesWithTwoCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))), + tx); + + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } @Test - public void testCanCommitWithOneCohort() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.YES); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.NO); - - future = proxy.canCommit(); - - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + public void testCanCommitNoWithThreeCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx))), tx); + + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test - public void testCanCommitWithMultipleCohorts() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.YES, CanCommitTransactionReply.YES); - - ListenableFuture future = proxy.canCommit(); + public void testCanCommitWithExceptionFailure() { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, + List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx); - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + propagateExecutionExceptionCause(proxy.canCommit(), TestException.class); } @Test - public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(3); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES); + public void testCanCommitWithInvalidResponseType() { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, + List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx); - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS); + assertEquals("Unexpected response type class java.lang.String", + propagateExecutionExceptionCause(proxy.canCommit(), IllegalArgumentException.class)); } - @Test(expected = TestException.class) - public void testCanCommitWithExceptionFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException()); + @Test + public void testCanCommitWithFailedCohortFuture() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx)), + newCohortInfoWithFailedFuture(new TestException()), + newCohortInfo(new CohortActor.Builder(tx))), tx); - propagateExecutionExceptionCause(proxy.canCommit()); + propagateExecutionExceptionCause(proxy.canCommit(), TestException.class); } - @Test(expected = ExecutionException.class) - public void testCanCommitWithInvalidResponseType() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply()); - - proxy.canCommit().get(5, TimeUnit.SECONDS); + @Test + public void testAllThreePhasesSuccessful() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))), tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = TestException.class) - public void testCanCommitWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); - - try { - propagateExecutionExceptionCause(proxy.canCommit()); - } finally { - verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS); - } + @Test + public void testCommitWithExceptionFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(new TestException()))), tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit(), TestException.class); } @Test - public void testPreCommit() throws Exception { - // Precommit is currently a no-op - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply()); - - proxy.preCommit().get(5, TimeUnit.SECONDS); + public void testCommitWithInvalidResponseType() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,List.of( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit("invalid"))), + tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + assertEquals("Unexpected response type class java.lang.String", + propagateExecutionExceptionCause(proxy.commit(), IllegalArgumentException.class)); } @Test public void testAbort() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply()); - - proxy.abort().get(5, TimeUnit.SECONDS); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, + List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort( + AbortTransactionReply.instance(CURRENT_VERSION)))), + tx); - verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test public void testAbortWithFailure() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock")); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, + List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx); // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testAbortWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); + public void testAbortWithFailedCohortFuture() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of( + newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx))), tx); - // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testCommit() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); + public void testWithNoCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(), tx); - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), - new CommitTransactionReply()); - - proxy.commit().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = TestException.class) - public void testCommitWithFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), - new TestException()); - - propagateExecutionExceptionCause(proxy.commit()); + private String propagateExecutionExceptionCause(final ListenableFuture future, + final Class expected) { + final var ex = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)).getCause(); + verifyCohortActors(); + assertThat(ex, instanceOf(expected)); + return ex.getMessage(); } - @Test(expected = ExecutionException.class) - public void testCommitWithInvalidResponseType() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply()); + private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) { + TestActorRef actor = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort")); + cohortActors.add(actor); + return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version); + } - proxy.commit().get(5, TimeUnit.SECONDS); + private CohortInfo newCohortInfo(final CohortActor.Builder builder) { + return newCohortInfo(builder, CURRENT_VERSION); } - @Test(expected = TestException.class) - public void testCommitWithFailedCohortPath() throws Throwable { + private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) { + return new CohortInfo(Futures.failed(failure), () -> CURRENT_VERSION); + } - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); + private void verifyCohortActors() { + for (TestActorRef actor: cohortActors) { + actor.underlyingActor().verify(); + } + } + @SuppressWarnings("checkstyle:IllegalCatch") + private T verifySuccessfulFuture(final ListenableFuture future) throws Exception { try { - propagateExecutionExceptionCause(proxy.commit()); - } finally { - - verify(actorContext, never()).setTxCreationLimit(anyLong()); - verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS); + return future.get(5, TimeUnit.SECONDS); + } catch (Exception e) { + verifyCohortActors(); + throw e; } - } - @Test - public void testAllThreePhasesSuccessful() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - CanCommitTransactionReply.YES, CanCommitTransactionReply.YES); - - setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply(), new PreCommitTransactionReply()); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, - new CommitTransactionReply(), new CommitTransactionReply()); + private void verifyCanCommit(final ListenableFuture future, final boolean expected) throws Exception { + Boolean actual = verifySuccessfulFuture(future); + assertEquals("canCommit", expected, actual); + } - assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + private static class CohortActor extends UntypedAbstractActor { + private final Builder builder; + private final AtomicInteger canCommitCount = new AtomicInteger(); + private final AtomicInteger commitCount = new AtomicInteger(); + private final AtomicInteger abortCount = new AtomicInteger(); + private volatile AssertionError assertionError; - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + CohortActor(final Builder builder) { + this.builder = builder; + } - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); - verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + @Override + public void onReceive(final Object message) { + if (CanCommitTransaction.isSerializedType(message)) { + canCommitCount.incrementAndGet(); + onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message), + builder.expCanCommitType, builder.canCommitReply); + } else if (CommitTransaction.isSerializedType(message)) { + commitCount.incrementAndGet(); + onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message), + builder.expCommitType, builder.commitReply); + } else if (AbortTransaction.isSerializedType(message)) { + abortCount.incrementAndGet(); + onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message), + builder.expAbortType, builder.abortReply); + } else { + assertionError = new AssertionError("Unexpected message " + message); + } + } - // Verify that the creation limit was changed to 0.5 (based on setup) - verify(actorContext, timeout(5000)).setTxCreationLimit(0.5); - } + private void onMessage(final String name, final Object rawMessage, + final AbstractThreePhaseCommitMessage actualMessage, final Class expType, final Object reply) { + try { + assertNotNull("Unexpected " + name, expType); + assertEquals(name + " type", expType, rawMessage.getClass()); + assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId()); + + if (reply instanceof Throwable) { + getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self()); + } else { + getSender().tell(reply, self()); + } + } catch (AssertionError e) { + assertionError = e; + } + } - @Test - public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception { + void verify() { + if (assertionError != null) { + throw assertionError; + } - ThreePhaseCommitCohortProxy proxy = setupProxy(0); + if (builder.expCanCommitType != null) { + assertEquals("CanCommitTransaction count", 1, canCommitCount.get()); + } - assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + if (builder.expCommitType != null) { + assertEquals("CommitTransaction count", 1, commitCount.get()); + } - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + if (builder.expAbortType != null) { + assertEquals("AbortTransaction count", 1, abortCount.get()); + } + } - verify(actorContext, never()).setTxCreationLimit(anyLong()); + static class Builder { + private Class expCanCommitType; + private Class expCommitType; + private Class expAbortType; + private Object canCommitReply; + private Object commitReply; + private Object abortReply; + private final TransactionIdentifier transactionId; + + Builder(final TransactionIdentifier transactionId) { + this.transactionId = requireNonNull(transactionId); + } + + Builder expectCanCommit(final Class newExpCanCommitType, final Object newCanCommitReply) { + expCanCommitType = newExpCanCommitType; + canCommitReply = newCanCommitReply; + return this; + } + + Builder expectCanCommit(final Object newCanCommitReply) { + return expectCanCommit(CanCommitTransaction.class, newCanCommitReply); + } + + Builder expectCommit(final Class newExpCommitType, final Object newCommitReply) { + expCommitType = newExpCommitType; + commitReply = newCommitReply; + return this; + } + + Builder expectCommit(final Object newCommitReply) { + return expectCommit(CommitTransaction.class, newCommitReply); + } + + Builder expectAbort(final Class newExpAbortType, final Object newAbortReply) { + expAbortType = newExpAbortType; + abortReply = newAbortReply; + return this; + } + + Builder expectAbort(final Object newAbortReply) { + return expectAbort(AbortTransaction.class, newAbortReply); + } + + Props props() { + return Props.create(CohortActor.class, this); + } + } } }