+ private void onMessage(CommitProtocolCommand<?> message) {
+ final ActorRef sender = getSender();
+ TransactionIdentifier txId = message.getTxId();
+ ListenableFuture<S> future = process(handledMessageType.cast(message));
+ Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
+ : runnable -> executeInSelf(runnable);
+ Futures.addCallback(future, new FutureCallback<S>() {
+ @Override
+ public void onSuccess(S nextStep) {
+ success(txId, sender, nextStep);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ failed(txId, sender, failure);
+ }
+ }, callbackExecutor);
+ }
+
+ private void failed(TransactionIdentifier txId, ActorRef sender, Throwable failure) {
+ currentStateMap.remove(txId);
+ sender.tell(new Status.Failure(failure), getSelf());
+ }
+
+ private void success(TransactionIdentifier txId, ActorRef sender, S nextStep) {
+ currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
+ sender.tell(new Success(getSelf(), txId), getSelf());
+ }
+
+ private void onAbort(TransactionIdentifier txId) {
+ currentStateMap.remove(txId);
+ final ActorRef sender = getSender();
+ Futures.addCallback(abort(), new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object noop) {
+ sender.tell(new Success(getSelf(), txId), getSelf());
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
+ sender.tell(new Status.Failure(failure), getSelf());
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ @Nullable
+ abstract CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
+
+ @Nonnull
+ abstract ListenableFuture<S> process(M command);