X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxyTest.java;h=ba7295bcea75a7a3b4f9806549065211a0290f13;hp=3c9d857fe81db981d341233e2c7249f06273bee6;hb=20a32e6459fd1e27e7669bf1ebc7742b96787b94;hpb=7ca766e911670b348d68c191ba16a903f1bdc245 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 3c9d857fe8..ba7295bcea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -1,44 +1,56 @@ -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.ActorPath; -import akka.actor.ActorSelection; -import akka.actor.Props; -import akka.dispatch.Futures; +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; +package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.times; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; +import akka.actor.ActorSelection; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.dispatch.Dispatchers; +import akka.dispatch.Futures; +import akka.testkit.TestActorRef; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.stubbing.Stubber; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; - -import scala.concurrent.Future; - -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @@ -46,278 +58,367 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { static class TestException extends RuntimeException { } - @Mock private ActorContext actorContext; - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - doReturn(getSystem()).when(actorContext).getActorSystem(); - } - - private Future newCohortPath() { - ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path(); - doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path); - return Futures.successful(path); - } + @Mock + private Timer commitTimer; - private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception { - List> cohortPathFutures = Lists.newArrayList(); - for(int i = 1; i <= nCohorts; i++) { - cohortPathFutures.add(newCohortPath()); - } + @Mock + private Timer.Context commitTimerContext; - return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1"); - } + @Mock + private Snapshot commitSnapshot; - private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath() - throws Exception { - List> cohortPathFutures = Lists.newArrayList(); - cohortPathFutures.add(newCohortPath()); - cohortPathFutures.add(Futures.failed(new TestException())); + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + private final List> cohortActors = new ArrayList<>(); + private final TransactionIdentifier tx = nextTransactionId(); - return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1"); - } - private void setupMockActorContext(Class requestType, Object... responses) { - Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures - .failed((Throwable) responses[0]) : Futures - .successful(((SerializableMessage) responses[0]).toSerializable())); + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); - for(int i = 1; i < responses.length; i++) { - stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures - .failed((Throwable) responses[i]) : Futures - .successful(((SerializableMessage) responses[i]).toSerializable())); + actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)), + new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), + new PrimaryShardInfoFutureCache()) { + @Override + public Timer getOperationTimer(final String operationName) { + return commitTimer; + } + + @Override + public double getTxCreationLimit() { + return 10.0; + } + }; + + doReturn(commitTimerContext).when(commitTimer).time(); + doReturn(commitSnapshot).when(commitTimer).getSnapshot(); + for (int i = 1; i < 11; i++) { + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); } - - stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), - isA(requestType)); } - private void verifyCohortInvocations(int nCohorts, Class requestType) { - verify(actorContext, times(nCohorts)).executeOperationAsync( - any(ActorSelection.class), isA(requestType)); - } - - private void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { + @Test + public void testCanCommitYesWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx); - try { - future.get(5, TimeUnit.SECONDS); - fail("Expected ExecutionException"); - } catch(ExecutionException e) { - throw e.getCause(); - } + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } @Test - public void testCanCommitWithOneCohort() throws Exception { + public void testCanCommitNoWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.no(CURRENT_VERSION)))), tx); - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(true)); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(false)); - - future = proxy.canCommit(); - - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test - public void testCanCommitWithMultipleCohorts() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(true), new CanCommitTransactionReply(true)); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + public void testCanCommitYesWithTwoCohorts() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION)))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); + + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } @Test - public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(3); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(true), new CanCommitTransactionReply(false), - new CanCommitTransactionReply(true)); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS); + public void testCanCommitNoWithThreeCohorts() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.no(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); + + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test(expected = TestException.class) - public void testCanCommitWithExceptionFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException()); + public void testCanCommitWithExceptionFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx); propagateExecutionExceptionCause(proxy.canCommit()); } - @Test(expected = ExecutionException.class) + @Test(expected = IllegalArgumentException.class) public void testCanCommitWithInvalidResponseType() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx); - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply()); - - proxy.canCommit().get(5, TimeUnit.SECONDS); + propagateExecutionExceptionCause(proxy.canCommit()); } @Test(expected = TestException.class) - public void testCanCommitWithFailedCohortPath() throws Throwable { + public void testCanCommitWithFailedCohortFuture() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx)), + newCohortInfoWithFailedFuture(new TestException()), + newCohortInfo(new CohortActor.Builder(tx))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); - - try { - propagateExecutionExceptionCause(proxy.canCommit()); - } finally { - verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS); - } + propagateExecutionExceptionCause(proxy.canCommit()); } @Test - public void testPreCommit() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply()); - - proxy.preCommit().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS); + public void testAllThreePhasesSuccessful() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo( + new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo( + new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = ExecutionException.class) - public void testPreCommitWithFailure() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(2); + @Test(expected = TestException.class) + public void testCommitWithExceptionFailure() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo( + new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo( + new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(new TestException()))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit()); + } - setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply(), new RuntimeException("mock")); + @Test(expected = IllegalArgumentException.class) + public void testCommitWithInvalidResponseType() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, + Arrays.asList(newCohortInfo(new CohortActor.Builder(tx) + .expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).expectCommit("invalid"))), tx); - proxy.preCommit().get(5, TimeUnit.SECONDS); + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit()); } @Test public void testAbort() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply()); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectAbort( + AbortTransactionReply.instance(CURRENT_VERSION)))), tx); - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test public void testAbortWithFailure() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock")); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx); // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testAbortWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); + public void testAbortWithFailedCohortFuture() throws Exception { + List cohorts = Arrays.asList( + newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); - // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testCommit() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), - new CommitTransactionReply()); - - proxy.commit().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + public void testWithNoCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, + Collections.emptyList(), tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = TestException.class) - public void testCommitWithFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), - new TestException()); + @SuppressWarnings("checkstyle:avoidHidingCauseException") + private void propagateExecutionExceptionCause(final ListenableFuture future) throws Exception { + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + verifyCohortActors(); + Throwables.propagateIfPossible(e.getCause(), Exception.class); + throw new RuntimeException(e.getCause()); + } + } - propagateExecutionExceptionCause(proxy.commit()); + private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) { + TestActorRef actor = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort")); + cohortActors.add(actor); + return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version); } - @Test(expected = ExecutionException.class) - public void testCommitWithInvalidResponseType() throws Exception { + private CohortInfo newCohortInfo(final CohortActor.Builder builder) { + return newCohortInfo(builder, CURRENT_VERSION); + } - ThreePhaseCommitCohortProxy proxy = setupProxy(1); + private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) { + return new CohortInfo(Futures.failed(failure), () -> CURRENT_VERSION); + } - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply()); + private void verifyCohortActors() { + for (TestActorRef actor: cohortActors) { + actor.underlyingActor().verify(); + } + } - proxy.commit().get(5, TimeUnit.SECONDS); + @SuppressWarnings("checkstyle:IllegalCatch") + private T verifySuccessfulFuture(final ListenableFuture future) throws Exception { + try { + return future.get(5, TimeUnit.SECONDS); + } catch (Exception e) { + verifyCohortActors(); + throw e; + } } - @Test(expected = TestException.class) - public void testCommitWithFailedCohortPath() throws Throwable { + private void verifyCanCommit(final ListenableFuture future, final boolean expected) throws Exception { + Boolean actual = verifySuccessfulFuture(future); + assertEquals("canCommit", expected, actual); + } - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); + private static class CohortActor extends UntypedActor { + private final Builder builder; + private final AtomicInteger canCommitCount = new AtomicInteger(); + private final AtomicInteger commitCount = new AtomicInteger(); + private final AtomicInteger abortCount = new AtomicInteger(); + private volatile AssertionError assertionError; - try { - propagateExecutionExceptionCause(proxy.commit()); - } finally { - verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS); + CohortActor(final Builder builder) { + this.builder = builder; } - } - @Test - public void testAllThreePhasesSuccessful() throws Exception { + @Override + public void onReceive(final Object message) { + if (CanCommitTransaction.isSerializedType(message)) { + canCommitCount.incrementAndGet(); + onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message), + builder.expCanCommitType, builder.canCommitReply); + } else if (CommitTransaction.isSerializedType(message)) { + commitCount.incrementAndGet(); + onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message), + builder.expCommitType, builder.commitReply); + } else if (AbortTransaction.isSerializedType(message)) { + abortCount.incrementAndGet(); + onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message), + builder.expAbortType, builder.abortReply); + } else { + assertionError = new AssertionError("Unexpected message " + message); + } + } - ThreePhaseCommitCohortProxy proxy = setupProxy(2); + private void onMessage(final String name, final Object rawMessage, + final AbstractThreePhaseCommitMessage actualMessage, final Class expType, final Object reply) { + try { + assertNotNull("Unexpected " + name, expType); + assertEquals(name + " type", expType, rawMessage.getClass()); + assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId()); + + if (reply instanceof Throwable) { + getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self()); + } else { + getSender().tell(reply, self()); + } + } catch (AssertionError e) { + assertionError = e; + } + } - setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(true), new CanCommitTransactionReply(true)); + void verify() { + if (assertionError != null) { + throw assertionError; + } - setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, - new PreCommitTransactionReply(), new PreCommitTransactionReply()); + if (builder.expCanCommitType != null) { + assertEquals("CanCommitTransaction count", 1, canCommitCount.get()); + } - setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, - new CommitTransactionReply(), new CommitTransactionReply()); + if (builder.expCommitType != null) { + assertEquals("CommitTransaction count", 1, commitCount.get()); + } - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + if (builder.expAbortType != null) { + assertEquals("AbortTransaction count", 1, abortCount.get()); + } + } - verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); - verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS); - verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + static class Builder { + private Class expCanCommitType; + private Class expCommitType; + private Class expAbortType; + private Object canCommitReply; + private Object commitReply; + private Object abortReply; + private final TransactionIdentifier transactionId; + + Builder(final TransactionIdentifier transactionId) { + this.transactionId = Preconditions.checkNotNull(transactionId); + } + + Builder expectCanCommit(final Class newExpCanCommitType, final Object newCanCommitReply) { + this.expCanCommitType = newExpCanCommitType; + this.canCommitReply = newCanCommitReply; + return this; + } + + Builder expectCanCommit(final Object newCanCommitReply) { + return expectCanCommit(CanCommitTransaction.class, newCanCommitReply); + } + + Builder expectCommit(final Class newExpCommitType, final Object newCommitReply) { + this.expCommitType = newExpCommitType; + this.commitReply = newCommitReply; + return this; + } + + Builder expectCommit(final Object newCommitReply) { + return expectCommit(CommitTransaction.class, newCommitReply); + } + + Builder expectAbort(final Class newExpAbortType, final Object newAbortReply) { + this.expAbortType = newExpAbortType; + this.abortReply = newAbortReply; + return this; + } + + Builder expectAbort(final Object newAbortReply) { + return expectAbort(AbortTransaction.class, newAbortReply); + } + + Props props() { + return Props.create(CohortActor.class, this); + } + } } }