private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
- private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+ /**
+ * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+ * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+ * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+ * result in StackOverflowError.
+ */
+ private static final int MAX_TRANSACTION_BATCH = 100;
+ private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
private SchemaContext schemaContext;
+ private int currentTransactionBatch;
+
ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
}
+ void resetTransactionBatch() {
+ currentTransactionBatch = 0;
+ }
+
/**
* Take a snapshot of current state for later recovery.
*
return ret;
}
+ /**
+ * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+ */
+ void resumeNextPendingTransaction() {
+ LOG.debug("{}: attempting to resume transaction processing", logContext);
+ processNextPending();
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void processNextPendingTransaction() {
+ ++currentTransactionBatch;
+ if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+ LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+ shard.scheduleNextPendingTransaction();
+ return;
+ }
+
processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();