X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=4ce848d0c00076305244c9d73c5e45760ff0788c;hp=ae09f1da78394fafd2ae0820b14386519978546b;hb=39b7a263d64559bc4f593726f56aa38ab9cc0b1c;hpb=1663020baaf094c4f2f31a18c787ce4d4efd11c8 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index ae09f1da78..4ce848d0c0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -14,7 +14,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -36,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -181,8 +181,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return logContext; } - final Ticker ticker() { - return shard.ticker(); + final long readTime() { + return shard.ticker().read(); } public TipProducingDataTree getDataTree() { @@ -523,12 +523,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) { + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, + @Nullable final Runnable callback) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null); + replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback); + } else if (callback != null) { + callback.run(); } return chain; @@ -539,7 +542,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } - return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId); + return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId); } ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { @@ -548,7 +551,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { .newModification()); } - return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId); + return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId); } @VisibleForTesting @@ -737,12 +740,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { - cohort.throwCanCommitFailure(); - tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); - entry.lastAccess = ticker().read(); + entry.lastAccess = readTime(); return; } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), @@ -805,8 +806,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } void startCanCommit(final SimpleShardDataTreeCohort cohort) { - final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort; - if (!cohort.equals(current)) { + final CommitEntry head = pendingTransactions.peek(); + if (head == null) { + LOG.warn("{}: No transactions enqueued while attempting to start canCommit on {}", logContext, cohort); + return; + } + if (!cohort.equals(head.cohort)) { LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier()); return; } @@ -842,7 +847,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Set the tip of the data tree. tip = Verify.verifyNotNull(candidate); - entry.lastAccess = ticker().read(); + entry.lastAccess = readTime(); pendingTransactions.remove(); pendingCommits.add(entry); @@ -956,16 +961,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Exception failure) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure); - pendingTransactions.add(new CommitEntry(cohort, ticker().read())); + final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure); + pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId, + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); - pendingTransactions.add(new CommitEntry(cohort, ticker().read())); + pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @@ -976,72 +981,107 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return createReadyCohort(txId, mod); } - return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod); + return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod); } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { + void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, + final Function> accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); - final long now = ticker().read(); + final long now = readTime(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; final CommitEntry currentTx = currentQueue.peek(); - if (currentTx != null && currentTx.lastAccess + timeout < now) { - LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState()); - boolean processNext = true; - switch (currentTx.cohort.getState()) { - case CAN_COMMIT_PENDING: - currentQueue.remove().cohort.failedCanCommit(new TimeoutException()); - break; - case CAN_COMMIT_COMPLETE: - // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause - // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code - // in PRE_COMMIT_COMPLETE is changed. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); - break; - case PRE_COMMIT_PENDING: - currentQueue.remove().cohort.failedPreCommit(new TimeoutException()); - break; - case PRE_COMMIT_COMPLETE: - // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we - // are ready we should commit the transaction, not abort it. Our current software stack does - // not allow us to do that consistently, because we persist at the time of commit, hence - // we can end up in a state where we have pre-committed a transaction, then a leader failover - // occurred ... the new leader does not see the pre-committed transaction and does not have - // a running timer. To fix this we really need two persistence events. - // - // The first one, done at pre-commit time will hold the transaction payload. When consensus - // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not - // apply the state in this event. - // - // The second one, done at commit (or abort) time holds only the transaction identifier and - // signals to followers that the state should (or should not) be applied. - // - // In order to make the pre-commit timer working across failovers, though, we need - // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately - // restart the timer. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); - break; - case COMMIT_PENDING: - LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, - currentTx.cohort.getIdentifier()); - currentTx.lastAccess = now; - processNext = false; - return; - case ABORTED: - case COMMITTED: - case FAILED: - case READY: - default: - currentQueue.remove(); + if (currentTx == null) { + // Empty queue, no-op + return; + } + + long delta = now - currentTx.lastAccess; + if (delta < timeout) { + // Not expired yet, bail + return; + } + + final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort); + if (updateOpt.isPresent()) { + final long newAccess = updateOpt.get().longValue(); + final long newDelta = now - newAccess; + if (newDelta < delta) { + LOG.debug("{}: Updated current transaction {} access time", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = newAccess; + delta = newDelta; } - if (processNext) { - processNextPending(); + if (delta < timeout) { + // Not expired yet, bail + return; } } + + final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta); + final State state = currentTx.cohort.getState(); + + LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, + currentTx.cohort.getIdentifier(), deltaMillis, state); + boolean processNext = true; + final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + + deltaMillis + "ms"); + + switch (state) { + case CAN_COMMIT_PENDING: + currentQueue.remove().cohort.failedCanCommit(cohortFailure); + break; + case CAN_COMMIT_COMPLETE: + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case PRE_COMMIT_PENDING: + currentQueue.remove().cohort.failedPreCommit(cohortFailure); + break; + case PRE_COMMIT_COMPLETE: + // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we + // are ready we should commit the transaction, not abort it. Our current software stack does + // not allow us to do that consistently, because we persist at the time of commit, hence + // we can end up in a state where we have pre-committed a transaction, then a leader failover + // occurred ... the new leader does not see the pre-committed transaction and does not have + // a running timer. To fix this we really need two persistence events. + // + // The first one, done at pre-commit time will hold the transaction payload. When consensus + // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not + // apply the state in this event. + // + // The second one, done at commit (or abort) time holds only the transaction identifier and + // signals to followers that the state should (or should not) be applied. + // + // In order to make the pre-commit timer working across failovers, though, we need + // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately + // restart the timer. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case COMMIT_PENDING: + LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = now; + processNext = false; + return; + case READY: + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case ABORTED: + case COMMITTED: + case FAILED: + default: + currentQueue.remove(); + } + + if (processNext) { + processNextPending(); + } } boolean startAbort(final SimpleShardDataTreeCohort cohort) { @@ -1143,4 +1183,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardStats getStats() { return shard.getShardMBean(); } + + Iterator cohortIterator() { + return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), + e -> e.cohort).iterator(); + } + + void removeTransactionChain(final LocalHistoryIdentifier id) { + if (transactionChains.remove(id) != null) { + LOG.debug("{}: Removed transaction chain {}", logContext, id); + } + } }