+ private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+ for(int i = 1; i <= nCohorts; i++) {
+ cohortFutures.add(newCohort());
+ }
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
+ }
+
+ private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+ throws Exception {
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+ cohortFutures.add(newCohort());
+ cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
+ }
+
+ private void setupMockActorContext(Class<?> requestType, Object... responses) {
+ Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
+ .failed((Throwable) responses[0]) : Futures
+ .successful(((SerializableMessage) responses[0]).toSerializable()));
+
+ for(int i = 1; i < responses.length; i++) {
+ stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
+ .failed((Throwable) responses[i]) : Futures
+ .successful(((SerializableMessage) responses[i]).toSerializable()));
+ }
+
+ stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
+ isA(requestType), any(Timeout.class));
+
+ doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+ .when(actorContext).getTransactionCommitOperationTimeout();
+ }
+
+ private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
+ verify(actorContext, times(nCohorts)).executeOperationAsync(
+ any(ActorSelection.class), isA(requestType), any(Timeout.class));
+ }
+
+ private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected ExecutionException");
+ } catch(ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Test
+ public void testCanCommitWithOneCohort() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ CanCommitTransactionReply.YES);
+
+ ListenableFuture<Boolean> future = proxy.canCommit();
+
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ CanCommitTransactionReply.NO);
+
+ future = proxy.canCommit();
+
+ assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);