- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
- @Nonnull final CohortEntry cohortEntry) {
- LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
- try {
- try {
- cohortEntry.commit();
- } catch(ExecutionException e) {
- // We may get a "store tree and candidate base differ" IllegalStateException from commit under
- // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
- // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
- // applying it to the state. We then become the leader and a second tx is pre-committed and
- // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
- // candidate via applyState prior to the second tx. Since the second tx has already been
- // pre-committed, when it gets here to commit it will get an IllegalStateException.
-
- // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
- // solution will be forthcoming.
- if(e.getCause() instanceof IllegalStateException) {
- LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(),
- transactionID, e);
- store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
- } else {
- throw e;
- }
- }
-
- sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
-
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
- } catch (Exception e) {
- sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
- LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- transactionID, e);
- shardMBean.incrementFailedTransactionsCount();
- } finally {
- commitCoordinator.currentTransactionComplete(transactionID, true);
- }
- }
-
- private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
- // With persistence enabled, this method is called via applyState by the leader strategy
- // after the commit has been replicated to a majority of the followers.
-
- CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if (cohortEntry == null) {
- // The transaction is no longer the current commit. This can happen if the transaction
- // was aborted prior, most likely due to timeout in the front-end. We need to finish
- // committing the transaction though since it was successfully persisted and replicated
- // however we can't use the original cohort b/c it was already preCommitted and may
- // conflict with the current commit or may have been aborted so we commit with a new
- // transaction.
- cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
- if(cohortEntry != null) {
- try {
- store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
- } catch (DataValidationFailedException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
- }
-
- sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
- getSelf());
- } else {
- // This really shouldn't happen - it likely means that persistence or replication
- // took so long to complete such that the cohort entry was expired from the cache.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
- persistenceId(), transactionID));
- LOG.error(ex.getMessage());
- sender.tell(new akka.actor.Status.Failure(ex), getSelf());
- }
- } else {
- finishCommit(sender, transactionID, cohortEntry);
- }
- }
-