- if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
- shardMBean.incrementFailedTransactionsCount();
- }
- }
-
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
- LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
- try {
- cohortEntry.commit();
-
- sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
-
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
- } catch (Exception e) {
- sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
- LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- transactionID, e);
- shardMBean.incrementFailedTransactionsCount();
- } finally {
- commitCoordinator.currentTransactionComplete(transactionID, true);
- }
- }
-
- private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
- // With persistence enabled, this method is called via applyState by the leader strategy
- // after the commit has been replicated to a majority of the followers.
-
- CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if (cohortEntry == null) {
- // The transaction is no longer the current commit. This can happen if the transaction
- // was aborted prior, most likely due to timeout in the front-end. We need to finish
- // committing the transaction though since it was successfully persisted and replicated
- // however we can't use the original cohort b/c it was already preCommitted and may
- // conflict with the current commit or may have been aborted so we commit with a new
- // transaction.
- cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
- if(cohortEntry != null) {
- try {
- store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
- } catch (DataValidationFailedException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
- }
-
- sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());