- private abstract static class Phase {
- abstract void txSuccess(ListenableFuture<Void> execFuture, long txId);
-
- abstract void txFailure(ListenableFuture<Void> execFuture, long txId, Throwable cause);
- }
-
- private static final class Running extends Phase {
- private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
- private Throwable failure;
-
- void addFuture(final ListenableFuture<Void> execFuture) {
- futures.add(execFuture);
- }
-
- @Override
- void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
- futures.remove(execFuture);
- }
-
- @Override
- void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
- futures.remove(execFuture);
- if (failure == null) {
- failure = cause;
- }
- }
-
- Optional<Throwable> getFailure() {
- return Optional.ofNullable(failure);
- }
- }
-
- private final class Collecting extends Phase {
- private final List<ListenableFuture<Void>> futures;
- private boolean done;
-
- Collecting(final Collection<ListenableFuture<Void>> futures) {
- this.futures = new ArrayList<>(futures);
- }
-
- @Override
- void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
- futures.remove(execFuture);
- if (futures.isEmpty() && !done) {
- LOG.debug("All futures completed successfully.");
- runSuccessful(txCounter);
- }
- }
-
- @Override
- void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
- futures.remove(execFuture);
- done = true;
- runFailed(cause);
- }