X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=694be4d1d16ac1e723359531fac964908bfdc3f3;hp=e8469d439d7df19b6fc5828574492beadf5109f1;hb=04ce3cfa566f7667800abb376d533f41246ff909;hpb=2634ed7138a343f051ff6452ccc7edd3abfc0c3a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index e8469d439d..694be4d1d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -14,7 +14,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -24,7 +23,6 @@ import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -45,6 +43,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; @@ -63,7 +62,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -108,8 +106,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); - private final Map transactionChains = new HashMap<>(); + /** + * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later + * execution to finish up the batch. This is necessary in case of a long list of transactions which progress + * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could + * result in StackOverflowError. + */ + private static final int MAX_TRANSACTION_BATCH = 100; + private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); @@ -138,7 +143,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private SchemaContext schemaContext; - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, + private int currentTransactionBatch; + + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -153,26 +160,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip = dataTree; } - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { + final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final ShardDataTreeMetadata... metadata) { this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), - treeChangeListenerPublisher, dataChangeListenerPublisher, logContext); + treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata); } @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(""), + new DefaultShardDataChangeListenerPublisher(""), ""); } final String logContext() { return logContext; } - final Ticker ticker() { - return shard.ticker(); + final long readTime() { + return shard.ticker().read(); } public TipProducingDataTree getDataTree() { @@ -188,6 +197,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.schemaContext = Preconditions.checkNotNull(newSchemaContext); } + void resetTransactionBatch() { + currentTransactionBatch = 0; + } + /** * Take a snapshot of current state for later recovery. * @@ -253,7 +266,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { dataTree.commit(candidate); notifyListeners(candidate); - LOG.debug("{}: state snapshot applied in %s", logContext, elapsed); + LOG.debug("{}: state snapshot applied in {}", logContext, elapsed); } /** @@ -335,8 +348,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); - } else if (payload instanceof DataTreeCandidatePayload) { - applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); } else { LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } @@ -381,51 +392,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { + final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - applyReplicatedCandidate(e.getKey(), e.getValue()); - allMetadataCommittedTransaction(e.getKey()); + txId = e.getKey(); + applyReplicatedCandidate(txId, e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + txId = (TransactionIdentifier) identifier; + payloadReplicationComplete(txId); } + allMetadataCommittedTransaction(txId); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); - } else { - allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } else if (payload instanceof PurgeTransactionPayload) { if (identifier != null) { payloadReplicationComplete((PurgeTransactionPayload) payload); - } else { - allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } else if (payload instanceof CloseLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CloseLocalHistoryPayload) payload); - } else { - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); - } - } else if (payload instanceof CloseLocalHistoryPayload) { - if (identifier != null) { - payloadReplicationComplete((CloseLocalHistoryPayload) payload); - } else { - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof CreateLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CreateLocalHistoryPayload)payload); - } else { - allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((PurgeLocalHistoryPayload)payload); - } else { - allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } @@ -517,12 +520,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) { + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, + @Nullable final Runnable callback) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true); + replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback); + } else if (callback != null) { + callback.run(); } return chain; @@ -533,7 +539,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } - return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId); + return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId); } ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { @@ -542,32 +548,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { .newModification()); } - return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId); + return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId); } @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { - treeChangeListenerPublisher.publishChanges(candidate, logContext); - dataChangeListenerPublisher.publishChanges(candidate, logContext); - } - - void notifyOfInitialData(final DataChangeListenerRegistration>> listenerReg, final Optional currentState) { - if (currentState.isPresent()) { - ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance(); - localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), - listenerReg.getScope()); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - - void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, - final Optional currentState) { - if (currentState.isPresent()) { - ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance(); - localPublisher.registerTreeChangeListener(path, listener); - localPublisher.publishChanges(currentState.get(), logContext); - } + treeChangeListenerPublisher.publishChanges(candidate); + dataChangeListenerPublisher.publishChanges(candidate); } /** @@ -622,29 +609,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } - Entry>>, - Optional> registerChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope) { - DataChangeListenerRegistration>> reg = - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); - - return new SimpleEntry<>(reg, readCurrentData()); + void registerDataChangeListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final Optional initialState, + final Consumer>>> + onRegistration) { + dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); } - private Optional readCurrentData() { + Optional readCurrentData() { final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> - registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = - treeChangeListenerPublisher.registerTreeChangeListener(path, listener); - - return new SimpleEntry<>(reg, readCurrentData()); + public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final Optional initialState, + final Consumer> onRegistration) { + treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } int getQueueSize() { @@ -658,11 +641,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, AbortTransactionPayload.create(id), callback); } - @Override - void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { - LOG.debug("{}: purging transaction {}", logContext, id); - replicatePayload(id, PurgeTransactionPayload.create(id), callback); + void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { + // No-op for free-standing transactions + } @Override @@ -673,6 +655,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return createReadyCohort(transaction.getIdentifier(), snapshot); } + void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { + LOG.debug("{}: purging transaction {}", logContext, id); + replicatePayload(id, PurgeTransactionPayload.create(id), callback); + } + public Optional> readNode(final YangInstanceIdentifier path) { return dataTree.takeSnapshot().readNode(path); } @@ -686,24 +673,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return dataTree.takeSnapshot().newModification(); } - /** - * Commits a modification. - * - * @deprecated This method violates DataTree containment and will be removed. - */ - @VisibleForTesting - @Deprecated - public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException { - // Direct modification commit is a utility, which cannot be used while we have transactions in-flight - Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending"); - - modification.ready(); - dataTree.validate(modification); - DataTreeCandidate candidate = dataTree.prepare(modification); - dataTree.commit(candidate); - return candidate; - } - public Collection getAndClearPendingTransactions() { Collection ret = new ArrayList<>(getQueueSize()); @@ -726,8 +695,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } + /** + * Called some time after {@link #processNextPendingTransaction()} decides to stop processing. + */ + void resumeNextPendingTransaction() { + LOG.debug("{}: attempting to resume transaction processing", logContext); + processNextPending(); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { + ++currentTransactionBatch; + if (currentTransactionBatch > MAX_TRANSACTION_BATCH) { + LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch); + shard.scheduleNextPendingTransaction(); + return; + } + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); @@ -735,10 +719,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { + cohort.throwCanCommitFailure(); + tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); - entry.lastAccess = ticker().read(); + entry.lastAccess = readTime(); return; } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), @@ -838,7 +824,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Set the tip of the data tree. tip = Verify.verifyNotNull(candidate); - entry.lastAccess = ticker().read(); + entry.lastAccess = readTime(); pendingTransactions.remove(); pendingCommits.add(entry); @@ -949,19 +935,37 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohortRegistry.process(sender, message); } + @Override + ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final Exception failure) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure); + pendingTransactions.add(new CommitEntry(cohort, readTime())); + return cohort; + } + @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, - final DataTreeModification modification) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, + final DataTreeModification mod) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); - pendingTransactions.add(new CommitEntry(cohort, ticker().read())); + pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } + // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics + // the newReadWriteTransaction() + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + if (txId.getHistoryId().getHistoryId() == 0) { + return createReadyCohort(txId, mod); + } + + return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod); + } + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); - final long now = ticker().read(); + final long now = readTime(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; @@ -1118,4 +1122,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { runOnPendingTransactionsComplete = null; } } + + ShardStats getStats() { + return shard.getShardMBean(); + } }