+ @Override
+ public ListenableFuture<Void> abort() {
+ // Note - we pass false for propagateException. In the front-end data broker, this method
+ // is called when one of the 3 phases fails with an exception. We'd rather have that
+ // original exception propagated to the client. If our abort fails and we propagate the
+ // exception then that exception will supersede and suppress the original exception. But
+ // it's the original exception that is the root cause and of more interest to the client.
+
+ return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
+ AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+ OperationCallback.NO_OP_CALLBACK;
+
+ return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
+ CommitTransactionReply.class, true, operationCallback);
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static boolean successfulFuture(final ListenableFuture<Void> future) {
+ if (!future.isDone()) {
+ return false;
+ }
+
+ try {
+ future.get();
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private ListenableFuture<Void> voidOperation(final String operationName,
+ final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
+ final boolean propagateException, final OperationCallback callback) {
+ LOG.debug("Tx {} {}", transactionId, operationName);
+
+ final SettableFuture<Void> returnFuture = SettableFuture.create();
+
+ // The cohort actor list should already be built at this point by the canCommit phase but,
+ // if not for some reason, we'll try to build it here.
+
+ ListenableFuture<Void> future = resolveCohorts();
+ if (successfulFuture(future)) {
+ finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
+ returnFuture, callback);
+ } else {
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void notUsed) {
+ finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
+ propagateException, returnFuture, callback);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
+
+ if (propagateException) {
+ returnFuture.setException(failure);
+ } else {
+ returnFuture.set(null);
+ }
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ return returnFuture;
+ }
+
+ private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier,
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture, final OperationCallback callback) {
+ LOG.debug("Tx {} finish {}", transactionId, operationName);
+
+ callback.resume();
+
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
+
+ combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(final Throwable failure, final Iterable<Object> responses) {
+ Throwable exceptionToPropagate = failure;
+ if (exceptionToPropagate == null) {
+ for (Object response: responses) {
+ if (!response.getClass().equals(expectedResponseClass)) {
+ exceptionToPropagate = new IllegalArgumentException(
+ String.format("Unexpected response type %s", response.getClass()));
+ break;
+ }
+ }
+ }
+
+ if (exceptionToPropagate != null) {
+ LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
+ if (propagateException) {
+ // We don't log the exception here to avoid redundant logging since we're
+ // propagating to the caller in MD-SAL core who will log it.
+ returnFuture.setException(exceptionToPropagate);
+ } else {
+ // Since the caller doesn't want us to propagate the exception we'll also
+ // not log it normally. But it's usually not good to totally silence
+ // exceptions so we'll log it to debug level.
+ returnFuture.set(null);
+ }
+
+ callback.failure();
+ } else {
+ LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
+
+ returnFuture.set(null);
+
+ callback.success();
+ }
+ }
+ }, actorUtils.getClientDispatcher());
+ }
+
+ @Override
+ List<Future<ActorSelection>> getCohortFutures() {
+ List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
+ for (CohortInfo info: cohorts) {
+ cohortFutures.add(info.getActorFuture());
+ }
+
+ return cohortFutures;
+ }
+
+ static class CohortInfo {
+ private final Future<ActorSelection> actorFuture;
+ private final Supplier<Short> actorVersionSupplier;
+
+ private volatile ActorSelection resolvedActor;
+
+ CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
+ this.actorFuture = actorFuture;
+ this.actorVersionSupplier = actorVersionSupplier;
+ }
+
+ Future<ActorSelection> getActorFuture() {
+ return actorFuture;
+ }
+
+ ActorSelection getResolvedActor() {
+ return resolvedActor;
+ }
+
+ void setResolvedActor(final ActorSelection resolvedActor) {
+ this.resolvedActor = resolvedActor;
+ }
+
+ short getActorVersion() {
+ checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
+ return actorVersionSupplier.get();
+ }
+ }
+
+ private interface MessageSupplier {
+ Object newMessage(TransactionIdentifier transactionId, short version);
+
+ boolean isSerializedReplyType(Object reply);