+ private void onAbort(TransactionIdentifier txId) {
+ currentStateMap.remove(txId);
+ final ActorRef sender = getSender();
+ Futures.addCallback(abort(), new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object noop) {
+ sender.tell(new Success(getSelf(), txId), getSelf());
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
+ sender.tell(new Status.Failure(failure), getSelf());
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ @Nullable
+ abstract CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
+
+ @Nonnull
+ abstract ListenableFuture<S> process(M command);
+
+ abstract ListenableFuture<?> abort();