X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=8f3a467d55de9cb739b6d24a7e5db6cfa34910cb;hp=1460982bf3d4010fa4558074033dd3df7b510033;hb=43130cfeb2a1ac9f733ac8a777cabb36ff1277af;hpb=4d7d88d74ee1177774fad5bd31ceaec2cee3056c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 1460982bf3..8f3a467d55 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -44,6 +44,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; @@ -106,8 +107,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); - private final Map transactionChains = new HashMap<>(); + /** + * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later + * execution to finish up the batch. This is necessary in case of a long list of transactions which progress + * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could + * result in StackOverflowError. + */ + private static final int MAX_TRANSACTION_BATCH = 100; + private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); @@ -136,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private SchemaContext schemaContext; + private int currentTransactionBatch; + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, @@ -163,7 +173,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(""), + new DefaultShardDataChangeListenerPublisher(""), ""); } final String logContext() { @@ -187,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.schemaContext = Preconditions.checkNotNull(newSchemaContext); } + void resetTransactionBatch() { + currentTransactionBatch = 0; + } + /** * Take a snapshot of current state for later recovery. * @@ -252,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { dataTree.commit(candidate); notifyListeners(candidate); - LOG.debug("{}: state snapshot applied in %s", logContext, elapsed); + LOG.debug("{}: state snapshot applied in {}", logContext, elapsed); } /** @@ -378,45 +393,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { + final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - applyReplicatedCandidate(e.getKey(), e.getValue()); - allMetadataCommittedTransaction(e.getKey()); + txId = e.getKey(); + applyReplicatedCandidate(txId, e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + txId = (TransactionIdentifier) identifier; + payloadReplicationComplete(txId); } + allMetadataCommittedTransaction(txId); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); - } else { - allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } else if (payload instanceof PurgeTransactionPayload) { if (identifier != null) { payloadReplicationComplete((PurgeTransactionPayload) payload); - } else { - allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } else if (payload instanceof CloseLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CloseLocalHistoryPayload) payload); - } else { - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof CreateLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CreateLocalHistoryPayload)payload); - } else { - allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((PurgeLocalHistoryPayload)payload); - } else { - allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } @@ -513,7 +526,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true); + replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null); } return chain; @@ -538,8 +551,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { - treeChangeListenerPublisher.publishChanges(candidate, logContext); - dataChangeListenerPublisher.publishChanges(candidate, logContext); + treeChangeListenerPublisher.publishChanges(candidate); + dataChangeListenerPublisher.publishChanges(candidate); } /** @@ -698,8 +711,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } + /** + * Called some time after {@link #processNextPendingTransaction()} decides to stop processing. + */ + void resumeNextPendingTransaction() { + LOG.debug("{}: attempting to resume transaction processing", logContext); + processNextPending(); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { + ++currentTransactionBatch; + if (currentTransactionBatch > MAX_TRANSACTION_BATCH) { + LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch); + shard.scheduleNextPendingTransaction(); + return; + } + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); @@ -1110,4 +1138,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { runOnPendingTransactionsComplete = null; } } + + ShardStats getStats() { + return shard.getShardMBean(); + } }