}
};
+ static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() {
+ @Override
+ public String toString() {
+ return "resumeNextPendingTransaction";
+ }
+ };
+
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
maybeError.get());
}
+ store.resetTransactionBatch();
+
if (message instanceof RequestEnvelope) {
final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
persistPayload(txId, AbortTransactionPayload.create(txId), true);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
+ } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+ store.resumeNextPendingTransaction();
} else {
super.handleNonRaftCommand(message);
}
Ticker ticker() {
return Ticker.systemTicker();
}
+
+ void scheduleNextPendingTransaction() {
+ self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender());
+ }
}
private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
- private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+ /**
+ * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+ * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+ * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+ * result in StackOverflowError.
+ */
+ private static final int MAX_TRANSACTION_BATCH = 100;
+ private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
private SchemaContext schemaContext;
+ private int currentTransactionBatch;
+
ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
}
+ void resetTransactionBatch() {
+ currentTransactionBatch = 0;
+ }
+
/**
* Take a snapshot of current state for later recovery.
*
return ret;
}
+ /**
+ * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+ */
+ void resumeNextPendingTransaction() {
+ LOG.debug("{}: attempting to resume transaction processing", logContext);
+ processNextPending();
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void processNextPendingTransaction() {
+ ++currentTransactionBatch;
+ if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+ LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+ shard.scheduleNextPendingTransaction();
+ return;
+ }
+
processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();