}
LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, throwable);
- final Exception e;
- if (throwable instanceof NoShardLeaderException || throwable instanceof ShardLeaderNotRespondingException) {
- e = new DataStoreUnavailableException(throwable.getMessage(), throwable);
- } else if (throwable instanceof Exception) {
- e = (Exception)throwable;
- } else {
- e = new RuntimeException("Unexpected error occurred", throwable);
- }
-
- final TransactionCommitFailedException clientException = exMapper.apply(e);
// Transaction failed - tell all cohorts to abort.
-
@SuppressWarnings("unchecked")
ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohorts.size()];
int index = 0;
canCommitFutures[index++] = cohort.abort();
}
+ // Propagate the original exception
+ final Exception e;
+ if (throwable instanceof NoShardLeaderException || throwable instanceof ShardLeaderNotRespondingException) {
+ e = new DataStoreUnavailableException(throwable.getMessage(), throwable);
+ } else if (throwable instanceof Exception) {
+ e = (Exception)throwable;
+ } else {
+ e = new RuntimeException("Unexpected error occurred", throwable);
+ }
+ clientSubmitFuture.setException(exMapper.apply(e));
+
ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(canCommitFutures);
Futures.addCallback(combinedFuture, new FutureCallback<List<Void>>() {
@Override
public void onSuccess(List<Void> notUsed) {
// Propagate the original exception to the client.
- clientSubmitFuture.setException(clientException);
+ LOG.debug("Tx: {} aborted successfully", transaction.getIdentifier());
}
@Override
public void onFailure(Throwable failure) {
LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), failure);
-
- // Propagate the original exception as that is what caused the Tx to fail and is
- // what's interesting to the client.
- clientSubmitFuture.setException(clientException);
}
}, MoreExecutors.directExecutor());
}