-
- LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- transactionID, e);
- shardMBean.incrementFailedTransactionsCount();
- } finally {
- commitCoordinator.currentTransactionComplete(transactionID, true);
- }
- }
-
- private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
- // With persistence enabled, this method is called via applyState by the leader strategy
- // after the commit has been replicated to a majority of the followers.
-
- CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if (cohortEntry == null) {
- // The transaction is no longer the current commit. This can happen if the transaction
- // was aborted prior, most likely due to timeout in the front-end. We need to finish
- // committing the transaction though since it was successfully persisted and replicated
- // however we can't use the original cohort b/c it was already preCommitted and may
- // conflict with the current commit or may have been aborted so we commit with a new
- // transaction.
- cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
- if(cohortEntry != null) {
- try {
- store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
- } catch (DataValidationFailedException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
- }
-
- sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
- } else {
- // This really shouldn't happen - it likely means that persistence or replication
- // took so long to complete such that the cohort entry was expired from the cache.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
- persistenceId(), transactionID));
- LOG.error(ex.getMessage());
- sender.tell(new akka.actor.Status.Failure(ex), getSelf());
- }
- } else {
- finishCommit(sender, transactionID, cohortEntry);