+ private Future<Iterable<Object>> invokeCohorts(Object message) {
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
+ for(ActorPath actorPath : cohortPaths) {
+
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
+ ActorContext.ASK_DURATION));
+ }
+
+ return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ LOG.debug("txn {} preCommit", transactionId);
+ return voidOperation(new PreCommitTransaction().toSerializable(),
+ PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ LOG.debug("txn {} abort", transactionId);
+
+ // Note - we pass false for propagateException. In the front-end data broker, this method
+ // is called when one of the 3 phases fails with an exception. We'd rather have that
+ // original exception propagated to the client. If our abort fails and we propagate the
+ // exception then that exception will supersede and suppress the original exception. But
+ // it's the original exception that is the root cause and of more interest to the client.
+
+ return voidOperation(new AbortTransaction().toSerializable(),
+ AbortTransactionReply.SERIALIZABLE_CLASS, false);