+ // The cohort actor list should already be built at this point by the canCommit phase but,
+ // if not for some reason, we'll try to build it here.
+
+ if(cohorts != null) {
+ finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
+ returnFuture);
+ } else {
+ buildCohortList().onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+ if(failure != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+ operationName, failure);
+ }
+ if(propagateException) {
+ returnFuture.setException(failure);
+ } else {
+ returnFuture.set(null);
+ }
+ } else {
+ finishVoidOperation(operationName, message, expectedResponseClass,
+ propagateException, returnFuture);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+
+ return returnFuture;
+ }
+
+ private void finishVoidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finish {}", transactionId, operationName);
+ }
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+