+ LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+
+ try {
+ // We block on the future here so we don't have to worry about possibly accessing our
+ // state on a different thread outside of our dispatcher. Also, the data store
+ // currently uses a same thread executor anyway.
+ cohortEntry.getCohort().commit().get();
+
+ sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ } catch (InterruptedException | ExecutionException e) {
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+ LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+ shardMBean.incrementFailedTransactionsCount();
+ }
+
+ commitCoordinator.currentTransactionComplete(transactionID, true);
+ }
+
+ private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
+ LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+ commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+ }
+
+ private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
+ LOG.debug("Readying transaction {}", ready.getTransactionID());
+
+ // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
+ // commitCoordinator in preparation for the subsequent three phase commit initiated by
+ // the front-end.
+ commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
+ ready.getModification());
+
+ // Return our actor path as we'll handle the three phase commit.
+ ReadyTransactionReply readyTransactionReply =
+ new ReadyTransactionReply(Serialization.serializedActorPath(self()));
+ getSender().tell(
+ ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
+ getSelf());
+ }
+
+ private void handleAbortTransaction(AbortTransaction abort) {
+ doAbortTransaction(abort.getTransactionID(), getSender());
+ }
+
+ private void doAbortTransaction(String transactionID, final ActorRef sender) {
+ final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry != null) {
+ LOG.debug("Aborting transaction {}", transactionID);
+
+ // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
+ // aborted during replication in which case we may still commit locally if replication
+ // succeeds.
+ commitCoordinator.currentTransactionComplete(transactionID, false);
+
+ final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
+ final ActorRef self = getSelf();
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void v) {
+ shardMBean.incrementAbortTransactionsCount();