+ public void testBackwardsCompatibilityWithPreBoron() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").
+ expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class,
+ CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)).
+ expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class,
+ CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)),
+ DataStoreVersions.LITHIUM_VERSION));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ verifySuccessfulFuture(proxy.commit());
+ verifyCohortActors();
+ }
+
+ private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected ExecutionException");
+ } catch(ExecutionException e) {
+ verifyCohortActors();
+ throw e.getCause();
+ }
+ }
+
+ private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
+ TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
+ cohortActors.add(actor);
+ return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier<Short>() {
+ @Override
+ public Short get() {
+ return version;
+ }
+ });
+ }
+
+ private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
+ return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
+ @Override
+ public Short get() {
+ return CURRENT_VERSION;
+ }
+ });
+ }
+
+ private CohortInfo newCohortInfo(CohortActor.Builder builder) {
+ return newCohortInfo(builder, CURRENT_VERSION);
+ }
+
+ private void verifyCohortActors() {
+ for(TestActorRef<CohortActor> actor: cohortActors) {
+ actor.underlyingActor().verify();
+ }
+ }
+
+ private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
+ try {
+ return future.get(5, TimeUnit.SECONDS);
+ } catch(Exception e) {
+ verifyCohortActors();
+ throw e;
+ }
+ }
+
+ private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
+ Boolean actual = verifySuccessfulFuture(future);
+ assertEquals("canCommit", expected, actual);
+ }
+
+ private static class CohortActor extends UntypedActor {
+ private final Builder builder;
+ private final AtomicInteger canCommitCount = new AtomicInteger();
+ private final AtomicInteger commitCount = new AtomicInteger();
+ private final AtomicInteger abortCount = new AtomicInteger();
+ private volatile AssertionError assertionError;
+
+ private CohortActor(Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ if(CanCommitTransaction.isSerializedType(message)) {
+ canCommitCount.incrementAndGet();
+ onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
+ builder.expCanCommitType, builder.canCommitReply);
+ } else if(CommitTransaction.isSerializedType(message)) {
+ commitCount.incrementAndGet();
+ onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
+ builder.expCommitType, builder.commitReply);
+ } else if(AbortTransaction.isSerializedType(message)) {
+ abortCount.incrementAndGet();
+ onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
+ builder.expAbortType, builder.abortReply);
+ } else {
+ assertionError = new AssertionError("Unexpected message " + message);
+ }
+ }
+
+ private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
+ Class<?> expType, Object reply) {
+ try {
+ assertNotNull("Unexpected " + name, expType);
+ assertEquals(name + " type", expType, rawMessage.getClass());
+ assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID());
+
+ if(reply instanceof Throwable) {
+ getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
+ } else {
+ getSender().tell(reply, self());
+ }
+ } catch(AssertionError e) {
+ assertionError = e;
+ }
+ }
+
+ void verify() {
+ if(assertionError != null) {
+ throw assertionError;
+ }
+
+ if(builder.expCanCommitType != null) {
+ assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
+ }
+
+ if(builder.expCommitType != null) {
+ assertEquals("CommitTransaction count", 1, commitCount.get());
+ }
+
+ if(builder.expAbortType != null) {
+ assertEquals("AbortTransaction count", 1, abortCount.get());
+ }
+ }
+
+ static class Builder {
+ private Class<?> expCanCommitType;
+ private Class<?> expCommitType;
+ private Class<?> expAbortType;
+ private Object canCommitReply;
+ private Object commitReply;
+ private Object abortReply;
+ private final String transactionId;
+
+ Builder(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
+ this.expCanCommitType = expCanCommitType;
+ this.canCommitReply = canCommitReply;
+ return this;
+ }
+
+ Builder expectCanCommit(Object canCommitReply) {
+ return expectCanCommit(CanCommitTransaction.class, canCommitReply);
+ }
+
+ Builder expectCommit(Class<?> expCommitType, Object commitReply) {
+ this.expCommitType = expCommitType;
+ this.commitReply = commitReply;
+ return this;
+ }
+
+ Builder expectCommit(Object commitReply) {
+ return expectCommit(CommitTransaction.class, commitReply);
+ }
+
+ Builder expectAbort(Class<?> expAbortType, Object abortReply) {
+ this.expAbortType = expAbortType;
+ this.abortReply = abortReply;
+ return this;
+ }
+
+ Builder expectAbort(Object abortReply) {
+ return expectAbort(AbortTransaction.class, abortReply);
+ }
+
+ Props props() {
+ return Props.create(CohortActor.class, this);
+ }
+ }