X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=9aca517bb110d2f28381167962f2ad8f3de288af;hb=refs%2Fchanges%2F33%2F67433%2F2;hp=27577b27d8d4f9342a90ed6db72ec2a5ca70c06c;hpb=cd3a0e09db5a1def00c46a4be245dbdf648b539c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 27577b27d8..9aca517bb1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -67,15 +68,15 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -128,24 +129,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; - private final TipProducingDataTree dataTree; + private final DataTree dataTree; private final String logContext; private final Shard shard; private Runnable runOnPendingTransactionsComplete; /** * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a - * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another + * {@link DataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself. */ - private TipProducingDataTreeTip tip; + private DataTreeTip tip; private SchemaContext schemaContext; private int currentTransactionBatch; - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -165,8 +166,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { - this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), - treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata); + this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, + dataChangeListenerPublisher, logContext, metadata); + } + + private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) { + final DataTreeConfiguration baseConfig = DataTreeConfiguration.getDefault(treeType); + return new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType()) + .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled()) + .setUniqueIndexes(baseConfig.isUniqueIndexEnabled()) + .setRootPath(root) + .build()); } @VisibleForTesting @@ -184,7 +194,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return shard.ticker().read(); } - public TipProducingDataTree getDataTree() { + public DataTree getDataTree() { return dataTree; } @@ -298,7 +308,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { applySnapshot(snapshot, this::wrapWithPruning); } @@ -332,7 +342,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException { + void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException { if (payload instanceof CommitTransactionPayload) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); @@ -618,7 +628,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } Optional readCurrentData() { - final Optional> currentState = + final java.util.Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); @@ -661,7 +671,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } public Optional> readNode(final YangInstanceIdentifier path) { - return dataTree.takeSnapshot().readNode(path); + return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path)); } DataTreeSnapshot takeSnapshot() { @@ -964,74 +974,103 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { + void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, + final Function> accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = readTime(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; final CommitEntry currentTx = currentQueue.peek(); - if (currentTx != null && currentTx.lastAccess + timeout < now) { - final State state = currentTx.cohort.getState(); - LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, state); - boolean processNext = true; - final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " - + transactionCommitTimeoutMillis + "ms"); - - switch (state) { - case CAN_COMMIT_PENDING: - currentQueue.remove().cohort.failedCanCommit(cohortFailure); - break; - case CAN_COMMIT_COMPLETE: - // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause - // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code - // in PRE_COMMIT_COMPLETE is changed. - currentQueue.remove().cohort.reportFailure(cohortFailure); - break; - case PRE_COMMIT_PENDING: - currentQueue.remove().cohort.failedPreCommit(cohortFailure); - break; - case PRE_COMMIT_COMPLETE: - // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we - // are ready we should commit the transaction, not abort it. Our current software stack does - // not allow us to do that consistently, because we persist at the time of commit, hence - // we can end up in a state where we have pre-committed a transaction, then a leader failover - // occurred ... the new leader does not see the pre-committed transaction and does not have - // a running timer. To fix this we really need two persistence events. - // - // The first one, done at pre-commit time will hold the transaction payload. When consensus - // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not - // apply the state in this event. - // - // The second one, done at commit (or abort) time holds only the transaction identifier and - // signals to followers that the state should (or should not) be applied. - // - // In order to make the pre-commit timer working across failovers, though, we need - // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately - // restart the timer. - currentQueue.remove().cohort.reportFailure(cohortFailure); - break; - case COMMIT_PENDING: - LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, - currentTx.cohort.getIdentifier()); - currentTx.lastAccess = now; - processNext = false; - return; - case READY: - currentQueue.remove().cohort.reportFailure(cohortFailure); - break; - case ABORTED: - case COMMITTED: - case FAILED: - default: - currentQueue.remove(); + if (currentTx == null) { + // Empty queue, no-op + return; + } + + long delta = now - currentTx.lastAccess; + if (delta < timeout) { + // Not expired yet, bail + return; + } + + final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort); + if (updateOpt.isPresent()) { + final long newAccess = updateOpt.get().longValue(); + final long newDelta = now - newAccess; + if (newDelta < delta) { + LOG.debug("{}: Updated current transaction {} access time", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = newAccess; + delta = newDelta; } - if (processNext) { - processNextPending(); + if (delta < timeout) { + // Not expired yet, bail + return; } } + + final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta); + final State state = currentTx.cohort.getState(); + + LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, + currentTx.cohort.getIdentifier(), deltaMillis, state); + boolean processNext = true; + final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + + deltaMillis + "ms"); + + switch (state) { + case CAN_COMMIT_PENDING: + currentQueue.remove().cohort.failedCanCommit(cohortFailure); + break; + case CAN_COMMIT_COMPLETE: + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case PRE_COMMIT_PENDING: + currentQueue.remove().cohort.failedPreCommit(cohortFailure); + break; + case PRE_COMMIT_COMPLETE: + // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we + // are ready we should commit the transaction, not abort it. Our current software stack does + // not allow us to do that consistently, because we persist at the time of commit, hence + // we can end up in a state where we have pre-committed a transaction, then a leader failover + // occurred ... the new leader does not see the pre-committed transaction and does not have + // a running timer. To fix this we really need two persistence events. + // + // The first one, done at pre-commit time will hold the transaction payload. When consensus + // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not + // apply the state in this event. + // + // The second one, done at commit (or abort) time holds only the transaction identifier and + // signals to followers that the state should (or should not) be applied. + // + // In order to make the pre-commit timer working across failovers, though, we need + // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately + // restart the timer. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case COMMIT_PENDING: + LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = now; + processNext = false; + return; + case READY: + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case ABORTED: + case COMMITTED: + case FAILED: + default: + currentQueue.remove(); + } + + if (processNext) { + processNextPending(); + } } boolean startAbort(final SimpleShardDataTreeCohort cohort) { @@ -1062,7 +1101,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return false; } - TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); + DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { @@ -1084,7 +1123,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(final Iterator iter, @Nonnull final TipProducingDataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, @Nonnull final DataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; @@ -1133,4 +1172,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardStats getStats() { return shard.getShardMBean(); } + + Iterator cohortIterator() { + return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), + e -> e.cohort).iterator(); + } + + void removeTransactionChain(final LocalHistoryIdentifier id) { + if (transactionChains.remove(id) != null) { + LOG.debug("{}: Removed transaction chain {}", logContext, id); + } + } }