BUG-8056: place an upper bound on number of transactions processed 85/56985/2
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 12 May 2017 13:24:07 +0000 (15:24 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 12 May 2017 21:35:33 +0000 (21:35 +0000)
When transactions complete their preCommit step immediately we end
up scheduling the next transaction immediately in the call stack,
hence if that completes immediately we end up eating away our stack
until we hit StackOverflowError.

Limit the number of transactions we process as a reaction to a single
message so that stack usage is under control. Should we hit this
limit, schedule a contiuation, which will deal with the rest of
the transactions.

Change-Id: Iad2812c823bd8e91ad45020ac50f6a8626654afb
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java

index 2b4ff4845c360d66c4a4f86478cd03564484a086..fbd5b6456a79ae7c94a140008fda80db952d1dbc 100644 (file)
@@ -125,6 +125,13 @@ public class Shard extends RaftActor {
         }
     };
 
+    static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() {
+        @Override
+        public String toString() {
+            return "resumeNextPendingTransaction";
+        }
+    };
+
     // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
     public static final String DEFAULT_NAME = "default";
 
@@ -275,6 +282,8 @@ public class Shard extends RaftActor {
                     maybeError.get());
             }
 
+            store.resetTransactionBatch();
+
             if (message instanceof RequestEnvelope) {
                 final long now = ticker().read();
                 final RequestEnvelope envelope = (RequestEnvelope)message;
@@ -345,6 +354,8 @@ public class Shard extends RaftActor {
                 persistPayload(txId, AbortTransactionPayload.create(txId), true);
             } else if (message instanceof MakeLeaderLocal) {
                 onMakeLeaderLocal();
+            } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+                store.resumeNextPendingTransaction();
             } else {
                 super.handleNonRaftCommand(message);
             }
@@ -994,4 +1005,8 @@ public class Shard extends RaftActor {
     Ticker ticker() {
         return Ticker.systemTicker();
     }
+
+    void scheduleNextPendingTransaction() {
+        self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender());
+    }
 }
index e36bc98f99115b32c36d85010cbf088b98533225..dcf632d215e930fa2dbc1be3af65e5bb02645298 100644 (file)
@@ -106,8 +106,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
-    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    /**
+     * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+     * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+     * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+     * result in StackOverflowError.
+     */
+    private static final int MAX_TRANSACTION_BATCH = 100;
 
+    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
@@ -136,6 +143,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private SchemaContext schemaContext;
 
+    private int currentTransactionBatch;
+
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
@@ -188,6 +197,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
+    void resetTransactionBatch() {
+        currentTransactionBatch = 0;
+    }
+
     /**
      * Take a snapshot of current state for later recovery.
      *
@@ -699,8 +712,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
+    /**
+     * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+     */
+    void resumeNextPendingTransaction() {
+        LOG.debug("{}: attempting to resume transaction processing", logContext);
+        processNextPending();
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
+        ++currentTransactionBatch;
+        if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+            LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+            shard.scheduleNextPendingTransaction();
+            return;
+        }
+
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();