BUG-8056: place an upper bound on number of transactions processed 85/56985/2
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 12 May 2017 13:24:07 +0000 (15:24 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 12 May 2017 21:35:33 +0000 (21:35 +0000)
When transactions complete their preCommit step immediately we end
up scheduling the next transaction immediately in the call stack,
hence if that completes immediately we end up eating away our stack
until we hit StackOverflowError.

Limit the number of transactions we process as a reaction to a single
message so that stack usage is under control. Should we hit this
limit, schedule a contiuation, which will deal with the rest of
the transactions.

Change-Id: Iad2812c823bd8e91ad45020ac50f6a8626654afb
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java

index 2b4ff48..fbd5b64 100644 (file)
@@ -125,6 +125,13 @@ public class Shard extends RaftActor {
         }
     };
 
+    static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() {
+        @Override
+        public String toString() {
+            return "resumeNextPendingTransaction";
+        }
+    };
+
     // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
     public static final String DEFAULT_NAME = "default";
 
@@ -275,6 +282,8 @@ public class Shard extends RaftActor {
                     maybeError.get());
             }
 
+            store.resetTransactionBatch();
+
             if (message instanceof RequestEnvelope) {
                 final long now = ticker().read();
                 final RequestEnvelope envelope = (RequestEnvelope)message;
@@ -345,6 +354,8 @@ public class Shard extends RaftActor {
                 persistPayload(txId, AbortTransactionPayload.create(txId), true);
             } else if (message instanceof MakeLeaderLocal) {
                 onMakeLeaderLocal();
+            } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+                store.resumeNextPendingTransaction();
             } else {
                 super.handleNonRaftCommand(message);
             }
@@ -994,4 +1005,8 @@ public class Shard extends RaftActor {
     Ticker ticker() {
         return Ticker.systemTicker();
     }
+
+    void scheduleNextPendingTransaction() {
+        self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender());
+    }
 }
index e36bc98..dcf632d 100644 (file)
@@ -106,8 +106,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
-    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    /**
+     * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+     * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+     * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+     * result in StackOverflowError.
+     */
+    private static final int MAX_TRANSACTION_BATCH = 100;
 
+    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
@@ -136,6 +143,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private SchemaContext schemaContext;
 
+    private int currentTransactionBatch;
+
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
@@ -188,6 +197,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
+    void resetTransactionBatch() {
+        currentTransactionBatch = 0;
+    }
+
     /**
      * Take a snapshot of current state for later recovery.
      *
@@ -699,8 +712,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
+    /**
+     * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+     */
+    void resumeNextPendingTransaction() {
+        LOG.debug("{}: attempting to resume transaction processing", logContext);
+        processNextPending();
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
+        ++currentTransactionBatch;
+        if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+            LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+            shard.scheduleNextPendingTransaction();
+            return;
+        }
+
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.