!pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
final CommitEntry currentTx = currentQueue.peek();
if (currentTx != null && currentTx.lastAccess + timeout < now) {
+ final State state = currentTx.cohort.getState();
LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
- currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
+ currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, state);
boolean processNext = true;
- switch (currentTx.cohort.getState()) {
+ final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+ + transactionCommitTimeoutMillis + "ms");
+
+ switch (state) {
case CAN_COMMIT_PENDING:
- currentQueue.remove().cohort.failedCanCommit(new TimeoutException());
+ currentQueue.remove().cohort.failedCanCommit(cohortFailure);
break;
case CAN_COMMIT_COMPLETE:
// The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
// whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
// in PRE_COMMIT_COMPLETE is changed.
- currentQueue.remove().cohort.reportFailure(new TimeoutException());
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
break;
case PRE_COMMIT_PENDING:
- currentQueue.remove().cohort.failedPreCommit(new TimeoutException());
+ currentQueue.remove().cohort.failedPreCommit(cohortFailure);
break;
case PRE_COMMIT_COMPLETE:
// FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
// In order to make the pre-commit timer working across failovers, though, we need
// a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
// restart the timer.
- currentQueue.remove().cohort.reportFailure(new TimeoutException());
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
break;
case COMMIT_PENDING:
LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
currentTx.lastAccess = now;
processNext = false;
return;
+ case READY:
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
case ABORTED:
case COMMITTED:
case FAILED:
- case READY:
default:
currentQueue.remove();
}