X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=8f3a467d55de9cb739b6d24a7e5db6cfa34910cb;hb=90735c91ce3390a731afc446b4e7314c544ab9d6;hp=dffb69c36d3969c1bca35f9f6c5146c50ca1cf7f;hpb=cc7ef3a4cc3eb2027be5558c1564e580fd153087;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index dffb69c36d..8f3a467d55 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -45,6 +44,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; @@ -63,7 +63,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -108,8 +107,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); - private final Map transactionChains = new HashMap<>(); + /** + * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later + * execution to finish up the batch. This is necessary in case of a long list of transactions which progress + * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could + * result in StackOverflowError. + */ + private static final int MAX_TRANSACTION_BATCH = 100; + private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); @@ -138,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private SchemaContext schemaContext; + private int currentTransactionBatch; + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, @@ -165,7 +173,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(""), + new DefaultShardDataChangeListenerPublisher(""), ""); } final String logContext() { @@ -189,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.schemaContext = Preconditions.checkNotNull(newSchemaContext); } + void resetTransactionBatch() { + currentTransactionBatch = 0; + } + /** * Take a snapshot of current state for later recovery. * @@ -254,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { dataTree.commit(candidate); notifyListeners(candidate); - LOG.debug("{}: state snapshot applied in %s", logContext, elapsed); + LOG.debug("{}: state snapshot applied in {}", logContext, elapsed); } /** @@ -336,8 +349,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); - } else if (payload instanceof DataTreeCandidatePayload) { - applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); } else { LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } @@ -382,45 +393,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { + final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - applyReplicatedCandidate(e.getKey(), e.getValue()); - allMetadataCommittedTransaction(e.getKey()); + txId = e.getKey(); + applyReplicatedCandidate(txId, e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + txId = (TransactionIdentifier) identifier; + payloadReplicationComplete(txId); } + allMetadataCommittedTransaction(txId); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); - } else { - allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } else if (payload instanceof PurgeTransactionPayload) { if (identifier != null) { payloadReplicationComplete((PurgeTransactionPayload) payload); - } else { - allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } else if (payload instanceof CloseLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CloseLocalHistoryPayload) payload); - } else { - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof CreateLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CreateLocalHistoryPayload)payload); - } else { - allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((PurgeLocalHistoryPayload)payload); - } else { - allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } @@ -517,7 +526,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true); + replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null); } return chain; @@ -542,27 +551,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { - treeChangeListenerPublisher.publishChanges(candidate, logContext); - dataChangeListenerPublisher.publishChanges(candidate, logContext); - } - - void notifyOfInitialData(final DataChangeListenerRegistration>> listenerReg, final Optional currentState) { - if (currentState.isPresent()) { - ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance(); - localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), - listenerReg.getScope()); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - - void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, - final Optional currentState) { - if (currentState.isPresent()) { - ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance(); - localPublisher.registerTreeChangeListener(path, listener); - localPublisher.publishChanges(currentState.get(), logContext); - } + treeChangeListenerPublisher.publishChanges(candidate); + dataChangeListenerPublisher.publishChanges(candidate); } /** @@ -617,29 +607,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } - Entry>>, - Optional> registerChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope) { - DataChangeListenerRegistration>> reg = - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); - - return new SimpleEntry<>(reg, readCurrentData()); + void registerDataChangeListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final Optional initialState, + final Consumer>>> + onRegistration) { + dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); } - private Optional readCurrentData() { + Optional readCurrentData() { final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> - registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = - treeChangeListenerPublisher.registerTreeChangeListener(path, listener); - - return new SimpleEntry<>(reg, readCurrentData()); + public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final Optional initialState, + final Consumer> onRegistration) { + treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } int getQueueSize() { @@ -725,8 +711,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } + /** + * Called some time after {@link #processNextPendingTransaction()} decides to stop processing. + */ + void resumeNextPendingTransaction() { + LOG.debug("{}: attempting to resume transaction processing", logContext); + processNextPending(); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { + ++currentTransactionBatch; + if (currentTransactionBatch > MAX_TRANSACTION_BATCH) { + LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch); + shard.scheduleNextPendingTransaction(); + return; + } + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); @@ -734,6 +735,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { + cohort.throwCanCommitFailure(); + tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); @@ -948,15 +951,33 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohortRegistry.process(sender, message); } + @Override + ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final Exception failure) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure); + pendingTransactions.add(new CommitEntry(cohort, ticker().read())); + return cohort; + } + @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, - final DataTreeModification modification) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, + final DataTreeModification mod) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); pendingTransactions.add(new CommitEntry(cohort, ticker().read())); return cohort; } + // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics + // the newReadWriteTransaction() + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + if (txId.getHistoryId().getHistoryId() == 0) { + return createReadyCohort(txId, mod); + } + + return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod); + } + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); @@ -1117,4 +1138,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { runOnPendingTransactionsComplete = null; } } + + ShardStats getStats() { + return shard.getShardMBean(); + } }