X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=a04bf62a96d9e964208e0ac76cd29f104df649a6;hb=refs%2Fchanges%2F75%2F28775%2F21;hp=8832cd6d1f6db28fd834134930bfa68ffac5bfab;hpb=ca8bd9ab89b808ea1008ccf07c389097430bc911;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 8832cd6d1f..a04bf62a96 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -64,9 +64,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -108,6 +110,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Shard shard; private Runnable runOnPendingTransactionsComplete; + /** + * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a + * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another + * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current + * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself. + */ + private TipProducingDataTreeTip tip; + private SchemaContext schemaContext; public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, @@ -122,6 +132,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher); this.logContext = Preconditions.checkNotNull(logContext); this.metadata = ImmutableList.copyOf(metadata); + tip = dataTree; } public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, @@ -505,6 +516,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting @Deprecated public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException { + // Direct modification commit is a utility, which cannot be used while we have transactions in-flight + Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending"); + modification.ready(); dataTree.validate(modification); DataTreeCandidate candidate = dataTree.prepare(modification); @@ -519,6 +533,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } pendingTransactions.clear(); + tip = dataTree; return ret; } @@ -536,7 +551,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { - dataTree.validate(modification); + tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); entry.lastAccess = shard.ticker().read(); @@ -591,7 +606,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); final DataTreeCandidateTip candidate; try { - candidate = dataTree.prepare(cohort.getDataTreeModification()); + candidate = tip.prepare(cohort.getDataTreeModification()); } catch (Exception e) { failPreCommit(e); return; @@ -604,6 +619,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + // Set the tip of the data tree. + tip = Verify.verifyNotNull(candidate); + entry.lastAccess = shard.ticker().read(); cohort.successfulPreCommit(candidate); } @@ -629,6 +647,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + // All pending candidates have been committed, reset the tip to the data tree + if (tip == candidate) { + tip = dataTree; + } + shard.getShardMBean().incrementCommittedTransactionCount(); shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -683,8 +706,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return cohort; } - @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"}, - justification = "See inline comments below.") + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = shard.ticker().read(); @@ -695,16 +717,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { boolean processNext = true; switch (currentTx.cohort.getState()) { case CAN_COMMIT_PENDING: - pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException()); + pendingTransactions.remove().cohort.failedCanCommit(new TimeoutException()); break; case CAN_COMMIT_COMPLETE: // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code // in PRE_COMMIT_COMPLETE is changed. - pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); + pendingTransactions.remove().cohort.reportFailure(new TimeoutException()); break; case PRE_COMMIT_PENDING: - pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException()); + pendingTransactions.remove().cohort.failedPreCommit(new TimeoutException()); break; case PRE_COMMIT_COMPLETE: // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we @@ -724,7 +746,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // In order to make the pre-commit timer working across failovers, though, we need // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately // restart the timer. - pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); + pendingTransactions.remove().cohort.reportFailure(new TimeoutException()); break; case COMMIT_PENDING: LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, @@ -737,10 +759,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { case FAILED: case READY: default: - // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In - // this case, we just want to drop the current entry that expired and thus ignore the return value. - // In fact we really shouldn't hit this case but we handle all enums for completeness. - pendingTransactions.poll(); + pendingTransactions.remove(); } if (processNext) { @@ -749,39 +768,77 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - void startAbort(final SimpleShardDataTreeCohort cohort) { + boolean startAbort(final SimpleShardDataTreeCohort cohort) { final Iterator it = pendingTransactions.iterator(); if (!it.hasNext()) { LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier()); - return; + return true; } // First entry is special, as it may already be committing final CommitEntry first = it.next(); if (cohort.equals(first.cohort)) { if (cohort.getState() != State.COMMIT_PENDING) { - LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(), + LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(), cohort.getIdentifier()); - pendingTransactions.remove(); + it.remove(); + rebasePreCommittedTransactions(it, dataTree); processNextTransaction(); - } else { - LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + return true; } - return; + LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + return false; } + TipProducingDataTreeTip newTip = dataTree; while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier()); it.remove(); - return; + rebasePreCommittedTransactions(it, newTip); + return true; + } else { + newTip = cohort.getCandidate(); } } LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier()); + return true; + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void rebasePreCommittedTransactions(Iterator iter, TipProducingDataTreeTip newTip) { + tip = newTip; + while (iter.hasNext()) { + final SimpleShardDataTreeCohort cohort = iter.next().cohort; + if (cohort.getState() == State.CAN_COMMIT_COMPLETE) { + LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier()); + + try { + tip.validate(cohort.getDataTreeModification()); + } catch (DataValidationFailedException | RuntimeException e) { + LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e); + cohort.reportFailure(e); + } + } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) { + LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier()); + + try { + tip.validate(cohort.getDataTreeModification()); + DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification()); + cohort.userPreCommit(candidate); + + cohort.setNewCandidate(candidate); + tip = candidate; + } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) { + LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); + cohort.reportFailure(e); + } + } + } } void setRunOnPendingTransactionsComplete(final Runnable operation) {