+ private void onMessage(final CommitProtocolCommand<?> message) {
+ final ActorRef sender = getSender();
+ TransactionIdentifier txId = message.getTxId();
+ ListenableFuture<S> future = process(handledMessageType.cast(message));
+ Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
+ : DataTreeCohortActor.this::executeInSelf;
+ Futures.addCallback(future, new FutureCallback<S>() {
+ @Override
+ public void onSuccess(final S nextStep) {
+ success(txId, sender, nextStep);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failed(txId, sender, failure);
+ }
+ }, callbackExecutor);
+ }
+
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
+ private void failed(final TransactionIdentifier txId, final ActorRef sender, final Throwable failure) {
+ currentStateMap.remove(txId);
+ sender.tell(new Status.Failure(failure), getSelf());
+ }
+
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
+ private void success(final TransactionIdentifier txId, final ActorRef sender, final S nextStep) {
+ currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
+ sender.tell(new Success(getSelf(), txId), getSelf());
+ }
+
+ private void onAbort(final TransactionIdentifier txId) {
+ currentStateMap.remove(txId);
+ final ActorRef sender = getSender();
+ Futures.addCallback(abort(), new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(final Object noop) {
+ sender.tell(new Success(getSelf(), txId), getSelf());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
+ sender.tell(new Status.Failure(failure), getSelf());
+ }
+ }, MoreExecutors.directExecutor());
+ }