+ @Override
+ public ListenableFuture<Void> commit() {
+ OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+ OperationCallback.NO_OP_CALLBACK;
+
+ return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
+ CommitTransactionReply.class, true, operationCallback);
+ }
+
+ private static boolean successfulFuture(ListenableFuture<Void> future) {
+ if(!future.isDone()) {
+ return false;
+ }
+
+ try {
+ future.get();
+ return true;
+ } catch(Exception e) {
+ return false;
+ }
+ }
+
+ private ListenableFuture<Void> voidOperation(final String operationName,
+ final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
+ final boolean propagateException, final OperationCallback callback) {
+ LOG.debug("Tx {} {}", transactionId, operationName);
+
+ final SettableFuture<Void> returnFuture = SettableFuture.create();
+
+ // The cohort actor list should already be built at this point by the canCommit phase but,
+ // if not for some reason, we'll try to build it here.
+
+ ListenableFuture<Void> future = resolveCohorts();
+ if(successfulFuture(future)) {
+ finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
+ returnFuture, callback);
+ } else {
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void notUsed) {
+ finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
+ propagateException, returnFuture, callback);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
+
+ if(propagateException) {
+ returnFuture.setException(failure);
+ } else {
+ returnFuture.set(null);
+ }
+ }
+ });
+ }
+
+ return returnFuture;