- final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if(cohortEntry != null) {
- LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
-
- // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
- // aborted during replication in which case we may still commit locally if replication
- // succeeds.
- commitCoordinator.currentTransactionComplete(transactionID, false);
-
- final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
- final ActorRef self = getSelf();
-
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void v) {
- shardMBean.incrementAbortTransactionsCount();
-
- if(sender != null) {
- sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
- }
- }
-
- @Override
- public void onFailure(final Throwable t) {
- LOG.error("{}: An exception happened during abort", persistenceId(), t);
-
- if(sender != null) {
- sender.tell(new akka.actor.Status.Failure(t), self);
- }
- }
- });
- }