X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=ae09f1da78394fafd2ae0820b14386519978546b;hp=be4aa42669ff77fb653f59082f664dc85deaf7eb;hb=1663020baaf094c4f2f31a18c787ce4d4efd11c8;hpb=a21acd04ac162c1eb84a997ae0c9ac9df185422c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index be4aa42669..ae09f1da78 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -64,7 +63,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -109,8 +107,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); - private final Map transactionChains = new HashMap<>(); + /** + * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later + * execution to finish up the batch. This is necessary in case of a long list of transactions which progress + * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could + * result in StackOverflowError. + */ + private static final int MAX_TRANSACTION_BATCH = 100; + private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); @@ -139,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private SchemaContext schemaContext; + private int currentTransactionBatch; + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, @@ -166,7 +173,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(""), + new DefaultShardDataChangeListenerPublisher(""), ""); } final String logContext() { @@ -190,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.schemaContext = Preconditions.checkNotNull(newSchemaContext); } + void resetTransactionBatch() { + currentTransactionBatch = 0; + } + /** * Take a snapshot of current state for later recovery. * @@ -255,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { dataTree.commit(candidate); notifyListeners(candidate); - LOG.debug("{}: state snapshot applied in %s", logContext, elapsed); + LOG.debug("{}: state snapshot applied in {}", logContext, elapsed); } /** @@ -383,45 +395,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { + final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - applyReplicatedCandidate(e.getKey(), e.getValue()); - allMetadataCommittedTransaction(e.getKey()); + txId = e.getKey(); + applyReplicatedCandidate(txId, e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + txId = (TransactionIdentifier) identifier; + payloadReplicationComplete(txId); } + allMetadataCommittedTransaction(txId); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); - } else { - allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } else if (payload instanceof PurgeTransactionPayload) { if (identifier != null) { payloadReplicationComplete((PurgeTransactionPayload) payload); - } else { - allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } else if (payload instanceof CloseLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CloseLocalHistoryPayload) payload); - } else { - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof CreateLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CreateLocalHistoryPayload)payload); - } else { - allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((PurgeLocalHistoryPayload)payload); - } else { - allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } @@ -518,7 +528,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true); + replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null); } return chain; @@ -543,27 +553,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { - treeChangeListenerPublisher.publishChanges(candidate, logContext); - dataChangeListenerPublisher.publishChanges(candidate, logContext); - } - - void notifyOfInitialData(final DataChangeListenerRegistration>> listenerReg, final Optional currentState) { - if (currentState.isPresent()) { - ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance(); - localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), - listenerReg.getScope()); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - - void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, - final Optional currentState) { - if (currentState.isPresent()) { - ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance(); - localPublisher.registerTreeChangeListener(path, listener); - localPublisher.publishChanges(currentState.get(), logContext); - } + treeChangeListenerPublisher.publishChanges(candidate); + dataChangeListenerPublisher.publishChanges(candidate); } /** @@ -618,29 +609,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } - Entry>>, - Optional> registerChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope) { - DataChangeListenerRegistration>> reg = - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); - - return new SimpleEntry<>(reg, readCurrentData()); + void registerDataChangeListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final Optional initialState, + final Consumer>>> + onRegistration) { + dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); } - private Optional readCurrentData() { + Optional readCurrentData() { final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> - registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = - treeChangeListenerPublisher.registerTreeChangeListener(path, listener); - - return new SimpleEntry<>(reg, readCurrentData()); + public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final Optional initialState, + final Consumer> onRegistration) { + treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } int getQueueSize() { @@ -726,8 +713,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } + /** + * Called some time after {@link #processNextPendingTransaction()} decides to stop processing. + */ + void resumeNextPendingTransaction() { + LOG.debug("{}: attempting to resume transaction processing", logContext); + processNextPending(); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { + ++currentTransactionBatch; + if (currentTransactionBatch > MAX_TRANSACTION_BATCH) { + LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch); + shard.scheduleNextPendingTransaction(); + return; + } + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); @@ -951,7 +953,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohortRegistry.process(sender, message); } - @Override ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Exception failure) { @@ -968,6 +969,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return cohort; } + // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics + // the newReadWriteTransaction() + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + if (txId.getHistoryId().getHistoryId() == 0) { + return createReadyCohort(txId, mod); + } + + return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod); + } + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);