import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.UnaryOperator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
- void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
+ void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+ final Function<SimpleShardDataTreeCohort, Optional<Long>> accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
!pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
final CommitEntry currentTx = currentQueue.peek();
- if (currentTx != null && currentTx.lastAccess + timeout < now) {
- final State state = currentTx.cohort.getState();
- LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
- currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, state);
- boolean processNext = true;
- final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
- + transactionCommitTimeoutMillis + "ms");
-
- switch (state) {
- case CAN_COMMIT_PENDING:
- currentQueue.remove().cohort.failedCanCommit(cohortFailure);
- break;
- case CAN_COMMIT_COMPLETE:
- // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
- // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
- // in PRE_COMMIT_COMPLETE is changed.
- currentQueue.remove().cohort.reportFailure(cohortFailure);
- break;
- case PRE_COMMIT_PENDING:
- currentQueue.remove().cohort.failedPreCommit(cohortFailure);
- break;
- case PRE_COMMIT_COMPLETE:
- // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
- // are ready we should commit the transaction, not abort it. Our current software stack does
- // not allow us to do that consistently, because we persist at the time of commit, hence
- // we can end up in a state where we have pre-committed a transaction, then a leader failover
- // occurred ... the new leader does not see the pre-committed transaction and does not have
- // a running timer. To fix this we really need two persistence events.
- //
- // The first one, done at pre-commit time will hold the transaction payload. When consensus
- // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
- // apply the state in this event.
- //
- // The second one, done at commit (or abort) time holds only the transaction identifier and
- // signals to followers that the state should (or should not) be applied.
- //
- // In order to make the pre-commit timer working across failovers, though, we need
- // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
- // restart the timer.
- currentQueue.remove().cohort.reportFailure(cohortFailure);
- break;
- case COMMIT_PENDING:
- LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
- currentTx.cohort.getIdentifier());
- currentTx.lastAccess = now;
- processNext = false;
- return;
- case READY:
- currentQueue.remove().cohort.reportFailure(cohortFailure);
- break;
- case ABORTED:
- case COMMITTED:
- case FAILED:
- default:
- currentQueue.remove();
+ if (currentTx == null) {
+ // Empty queue, no-op
+ return;
+ }
+
+ long delta = now - currentTx.lastAccess;
+ if (delta < timeout) {
+ // Not expired yet, bail
+ return;
+ }
+
+ final Optional<Long> updateOpt = accessTimeUpdater.apply(currentTx.cohort);
+ if (updateOpt.isPresent()) {
+ final long newAccess = updateOpt.get().longValue();
+ final long newDelta = now - newAccess;
+ if (newDelta < delta) {
+ LOG.debug("{}: Updated current transaction {} access time", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = newAccess;
+ delta = newDelta;
}
- if (processNext) {
- processNextPending();
+ if (delta < timeout) {
+ // Not expired yet, bail
+ return;
}
}
+
+ final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta);
+ final State state = currentTx.cohort.getState();
+
+ LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+ currentTx.cohort.getIdentifier(), deltaMillis, state);
+ boolean processNext = true;
+ final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+ + deltaMillis + "ms");
+
+ switch (state) {
+ case CAN_COMMIT_PENDING:
+ currentQueue.remove().cohort.failedCanCommit(cohortFailure);
+ break;
+ case CAN_COMMIT_COMPLETE:
+ // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+ // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+ // in PRE_COMMIT_COMPLETE is changed.
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case PRE_COMMIT_PENDING:
+ currentQueue.remove().cohort.failedPreCommit(cohortFailure);
+ break;
+ case PRE_COMMIT_COMPLETE:
+ // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+ // are ready we should commit the transaction, not abort it. Our current software stack does
+ // not allow us to do that consistently, because we persist at the time of commit, hence
+ // we can end up in a state where we have pre-committed a transaction, then a leader failover
+ // occurred ... the new leader does not see the pre-committed transaction and does not have
+ // a running timer. To fix this we really need two persistence events.
+ //
+ // The first one, done at pre-commit time will hold the transaction payload. When consensus
+ // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+ // apply the state in this event.
+ //
+ // The second one, done at commit (or abort) time holds only the transaction identifier and
+ // signals to followers that the state should (or should not) be applied.
+ //
+ // In order to make the pre-commit timer working across failovers, though, we need
+ // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+ // restart the timer.
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case COMMIT_PENDING:
+ LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = now;
+ processNext = false;
+ return;
+ case READY:
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case ABORTED:
+ case COMMITTED:
+ case FAILED:
+ default:
+ currentQueue.remove();
+ }
+
+ if (processNext) {
+ processNextPending();
+ }
}
boolean startAbort(final SimpleShardDataTreeCohort cohort) {