From: Robert Varga Date: Fri, 12 May 2017 13:24:07 +0000 (+0200) Subject: BUG-8056: place an upper bound on number of transactions processed X-Git-Tag: release/nitrogen~265 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3613d3cbde81af18aab2d5917491e9f9b3bf63ca BUG-8056: place an upper bound on number of transactions processed When transactions complete their preCommit step immediately we end up scheduling the next transaction immediately in the call stack, hence if that completes immediately we end up eating away our stack until we hit StackOverflowError. Limit the number of transactions we process as a reaction to a single message so that stack usage is under control. Should we hit this limit, schedule a contiuation, which will deal with the rest of the transactions. Change-Id: Iad2812c823bd8e91ad45020ac50f6a8626654afb Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 2b4ff4845c..fbd5b6456a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -125,6 +125,13 @@ public class Shard extends RaftActor { } }; + static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() { + @Override + public String toString() { + return "resumeNextPendingTransaction"; + } + }; + // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant. public static final String DEFAULT_NAME = "default"; @@ -275,6 +282,8 @@ public class Shard extends RaftActor { maybeError.get()); } + store.resetTransactionBatch(); + if (message instanceof RequestEnvelope) { final long now = ticker().read(); final RequestEnvelope envelope = (RequestEnvelope)message; @@ -345,6 +354,8 @@ public class Shard extends RaftActor { persistPayload(txId, AbortTransactionPayload.create(txId), true); } else if (message instanceof MakeLeaderLocal) { onMakeLeaderLocal(); + } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { + store.resumeNextPendingTransaction(); } else { super.handleNonRaftCommand(message); } @@ -994,4 +1005,8 @@ public class Shard extends RaftActor { Ticker ticker() { return Ticker.systemTicker(); } + + void scheduleNextPendingTransaction() { + self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index e36bc98f99..dcf632d215 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -106,8 +106,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); - private final Map transactionChains = new HashMap<>(); + /** + * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later + * execution to finish up the batch. This is necessary in case of a long list of transactions which progress + * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could + * result in StackOverflowError. + */ + private static final int MAX_TRANSACTION_BATCH = 100; + private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); @@ -136,6 +143,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private SchemaContext schemaContext; + private int currentTransactionBatch; + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, @@ -188,6 +197,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.schemaContext = Preconditions.checkNotNull(newSchemaContext); } + void resetTransactionBatch() { + currentTransactionBatch = 0; + } + /** * Take a snapshot of current state for later recovery. * @@ -699,8 +712,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } + /** + * Called some time after {@link #processNextPendingTransaction()} decides to stop processing. + */ + void resumeNextPendingTransaction() { + LOG.debug("{}: attempting to resume transaction processing", logContext); + processNextPending(); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { + ++currentTransactionBatch; + if (currentTransactionBatch > MAX_TRANSACTION_BATCH) { + LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch); + shard.scheduleNextPendingTransaction(); + return; + } + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification();