X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=7aecda48db6ceaf7ca3a5dbc5ae266847b6676ca;hp=694be4d1d16ac1e723359531fac964908bfdc3f3;hb=64bc1360aedb83583edb354444ee3e4295c7a5e6;hpb=04ce3cfa566f7667800abb376d533f41246ff909 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 694be4d1d1..7aecda48db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -719,8 +719,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { - cohort.throwCanCommitFailure(); - tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); @@ -787,8 +785,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } void startCanCommit(final SimpleShardDataTreeCohort cohort) { - final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort; - if (!cohort.equals(current)) { + final CommitEntry head = pendingTransactions.peek(); + if (head == null) { + LOG.warn("{}: No transactions enqueued while attempting to start canCommit on {}", logContext, cohort); + return; + } + if (!cohort.equals(head.cohort)) { LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier()); return; } @@ -938,15 +940,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Exception failure) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure); + final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, - final DataTreeModification mod) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId, + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; @@ -971,21 +972,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; final CommitEntry currentTx = currentQueue.peek(); if (currentTx != null && currentTx.lastAccess + timeout < now) { + final State state = currentTx.cohort.getState(); LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState()); + currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, state); boolean processNext = true; - switch (currentTx.cohort.getState()) { + final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + + transactionCommitTimeoutMillis + "ms"); + + switch (state) { case CAN_COMMIT_PENDING: - currentQueue.remove().cohort.failedCanCommit(new TimeoutException()); + currentQueue.remove().cohort.failedCanCommit(cohortFailure); break; case CAN_COMMIT_COMPLETE: // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code // in PRE_COMMIT_COMPLETE is changed. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); + currentQueue.remove().cohort.reportFailure(cohortFailure); break; case PRE_COMMIT_PENDING: - currentQueue.remove().cohort.failedPreCommit(new TimeoutException()); + currentQueue.remove().cohort.failedPreCommit(cohortFailure); break; case PRE_COMMIT_COMPLETE: // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we @@ -1005,7 +1010,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // In order to make the pre-commit timer working across failovers, though, we need // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately // restart the timer. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); + currentQueue.remove().cohort.reportFailure(cohortFailure); break; case COMMIT_PENDING: LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, @@ -1013,10 +1018,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { currentTx.lastAccess = now; processNext = false; return; + case READY: + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; case ABORTED: case COMMITTED: case FAILED: - case READY: default: currentQueue.remove(); } @@ -1126,4 +1133,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardStats getStats() { return shard.getShardMBean(); } + + Iterator cohortIterator() { + return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), + e -> e.cohort).iterator(); + } + + void removeTransactionChain(final LocalHistoryIdentifier id) { + if (transactionChains.remove(id) != null) { + LOG.debug("{}: Removed transaction chain {}", logContext, id); + } + } }