X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=37e65f6a96e5130e53af66372ebb73937cf31332;hp=7b317faaee8619bdcd590cbf11ea59ca6e014b61;hb=653c1f5dd20c851ff0992b8d5ab7b1dcab891fca;hpb=90ba78e0575edaa56610eeed936c03261839f2d2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 7b317faaee..37e65f6a96 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -719,8 +720,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { - cohort.throwCanCommitFailure(); - tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); @@ -942,15 +941,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Exception failure) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure); + final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, - final DataTreeModification mod) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId, + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; @@ -967,68 +965,103 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { + void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, + final Function> accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = readTime(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; final CommitEntry currentTx = currentQueue.peek(); - if (currentTx != null && currentTx.lastAccess + timeout < now) { - LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState()); - boolean processNext = true; - switch (currentTx.cohort.getState()) { - case CAN_COMMIT_PENDING: - currentQueue.remove().cohort.failedCanCommit(new TimeoutException()); - break; - case CAN_COMMIT_COMPLETE: - // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause - // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code - // in PRE_COMMIT_COMPLETE is changed. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); - break; - case PRE_COMMIT_PENDING: - currentQueue.remove().cohort.failedPreCommit(new TimeoutException()); - break; - case PRE_COMMIT_COMPLETE: - // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we - // are ready we should commit the transaction, not abort it. Our current software stack does - // not allow us to do that consistently, because we persist at the time of commit, hence - // we can end up in a state where we have pre-committed a transaction, then a leader failover - // occurred ... the new leader does not see the pre-committed transaction and does not have - // a running timer. To fix this we really need two persistence events. - // - // The first one, done at pre-commit time will hold the transaction payload. When consensus - // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not - // apply the state in this event. - // - // The second one, done at commit (or abort) time holds only the transaction identifier and - // signals to followers that the state should (or should not) be applied. - // - // In order to make the pre-commit timer working across failovers, though, we need - // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately - // restart the timer. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); - break; - case COMMIT_PENDING: - LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, - currentTx.cohort.getIdentifier()); - currentTx.lastAccess = now; - processNext = false; - return; - case ABORTED: - case COMMITTED: - case FAILED: - case READY: - default: - currentQueue.remove(); + if (currentTx == null) { + // Empty queue, no-op + return; + } + + long delta = now - currentTx.lastAccess; + if (delta < timeout) { + // Not expired yet, bail + return; + } + + final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort); + if (updateOpt.isPresent()) { + final long newAccess = updateOpt.get().longValue(); + final long newDelta = now - newAccess; + if (newDelta < delta) { + LOG.debug("{}: Updated current transaction {} access time", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = newAccess; + delta = newDelta; } - if (processNext) { - processNextPending(); + if (delta < timeout) { + // Not expired yet, bail + return; } } + + final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta); + final State state = currentTx.cohort.getState(); + + LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, + currentTx.cohort.getIdentifier(), deltaMillis, state); + boolean processNext = true; + final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + + deltaMillis + "ms"); + + switch (state) { + case CAN_COMMIT_PENDING: + currentQueue.remove().cohort.failedCanCommit(cohortFailure); + break; + case CAN_COMMIT_COMPLETE: + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case PRE_COMMIT_PENDING: + currentQueue.remove().cohort.failedPreCommit(cohortFailure); + break; + case PRE_COMMIT_COMPLETE: + // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we + // are ready we should commit the transaction, not abort it. Our current software stack does + // not allow us to do that consistently, because we persist at the time of commit, hence + // we can end up in a state where we have pre-committed a transaction, then a leader failover + // occurred ... the new leader does not see the pre-committed transaction and does not have + // a running timer. To fix this we really need two persistence events. + // + // The first one, done at pre-commit time will hold the transaction payload. When consensus + // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not + // apply the state in this event. + // + // The second one, done at commit (or abort) time holds only the transaction identifier and + // signals to followers that the state should (or should not) be applied. + // + // In order to make the pre-commit timer working across failovers, though, we need + // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately + // restart the timer. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case COMMIT_PENDING: + LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = now; + processNext = false; + return; + case READY: + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case ABORTED: + case COMMITTED: + case FAILED: + default: + currentQueue.remove(); + } + + if (processNext) { + processNextPending(); + } } boolean startAbort(final SimpleShardDataTreeCohort cohort) { @@ -1130,4 +1163,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardStats getStats() { return shard.getShardMBean(); } + + Iterator cohortIterator() { + return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), + e -> e.cohort).iterator(); + } + + void removeTransactionChain(final LocalHistoryIdentifier id) { + if (transactionChains.remove(id) != null) { + LOG.debug("{}: Removed transaction chain {}", logContext, id); + } + } }