+ private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+ LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+
+ try {
+ // We block on the future here so we don't have to worry about possibly accessing our
+ // state on a different thread outside of our dispatcher. Also, the data store
+ // currently uses a same thread executor anyway.
+ cohortEntry.getCohort().commit().get();
+
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ } catch (Exception e) {
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+ LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+ transactionID, e);
+ shardMBean.incrementFailedTransactionsCount();
+ } finally {
+ commitCoordinator.currentTransactionComplete(transactionID, true);
+ }
+ }
+