- final long now = shard.ticker().read();
- final CommitEntry currentTx = pendingTransactions.peek();
- if (currentTx != null && currentTx.lastAccess + timeout < now) {
- LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
- currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
- boolean processNext = true;
- switch (currentTx.cohort.getState()) {
- case CAN_COMMIT_PENDING:
- pendingTransactions.remove().cohort.failedCanCommit(new TimeoutException());
- break;
- case CAN_COMMIT_COMPLETE:
- // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
- // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
- // in PRE_COMMIT_COMPLETE is changed.
- pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
- break;
- case PRE_COMMIT_PENDING:
- pendingTransactions.remove().cohort.failedPreCommit(new TimeoutException());
- break;
- case PRE_COMMIT_COMPLETE:
- // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
- // are ready we should commit the transaction, not abort it. Our current software stack does
- // not allow us to do that consistently, because we persist at the time of commit, hence
- // we can end up in a state where we have pre-committed a transaction, then a leader failover
- // occurred ... the new leader does not see the pre-committed transaction and does not have
- // a running timer. To fix this we really need two persistence events.
- //
- // The first one, done at pre-commit time will hold the transaction payload. When consensus
- // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
- // apply the state in this event.
- //
- // The second one, done at commit (or abort) time holds only the transaction identifier and
- // signals to followers that the state should (or should not) be applied.
- //
- // In order to make the pre-commit timer working across failovers, though, we need
- // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
- // restart the timer.
- pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
- break;
- case COMMIT_PENDING:
- LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
- currentTx.cohort.getIdentifier());
- currentTx.lastAccess = now;
- processNext = false;
- return;
- case ABORTED:
- case COMMITTED:
- case FAILED:
- case READY:
- default:
- pendingTransactions.remove();
+ final long now = readTime();
+
+ final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
+ !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
+ final CommitEntry currentTx = currentQueue.peek();
+ if (currentTx == null) {
+ // Empty queue, no-op
+ return;
+ }
+
+ long delta = now - currentTx.lastAccess;
+ if (delta < timeout) {
+ // Not expired yet, bail
+ return;
+ }
+
+ final Optional<Long> updateOpt = accessTimeUpdater.apply(currentTx.cohort);
+ if (updateOpt.isPresent()) {
+ final long newAccess = updateOpt.get().longValue();
+ final long newDelta = now - newAccess;
+ if (newDelta < delta) {
+ LOG.debug("{}: Updated current transaction {} access time", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = newAccess;
+ delta = newDelta;