BUG-8538: rework transaction abort paths
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index b5ee4cc1ab0fae0ddc5074d00c47dba196ebba6c..ed36813d49a06f94d11678d626183aeb04288a97 100644 (file)
@@ -107,8 +107,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
-    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    /**
+     * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+     * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+     * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+     * result in StackOverflowError.
+     */
+    private static final int MAX_TRANSACTION_BATCH = 100;
 
 
+    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
@@ -137,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private SchemaContext schemaContext;
 
 
     private SchemaContext schemaContext;
 
+    private int currentTransactionBatch;
+
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
@@ -164,7 +173,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
+                new DefaultShardDataTreeChangeListenerPublisher(""),
+                new DefaultShardDataChangeListenerPublisher(""), "");
     }
 
     final String logContext() {
     }
 
     final String logContext() {
@@ -188,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
+    void resetTransactionBatch() {
+        currentTransactionBatch = 0;
+    }
+
     /**
      * Take a snapshot of current state for later recovery.
      *
     /**
      * Take a snapshot of current state for later recovery.
      *
@@ -253,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         dataTree.commit(candidate);
         notifyListeners(candidate);
 
         dataTree.commit(candidate);
         notifyListeners(candidate);
 
-        LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+        LOG.debug("{}: state snapshot applied in {}", logContext, elapsed);
     }
 
     /**
     }
 
     /**
@@ -381,45 +395,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
+            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                applyReplicatedCandidate(e.getKey(), e.getValue());
-                allMetadataCommittedTransaction(e.getKey());
+                txId = e.getKey();
+                applyReplicatedCandidate(txId, e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                payloadReplicationComplete((TransactionIdentifier) identifier);
+                txId = (TransactionIdentifier) identifier;
+                payloadReplicationComplete(txId);
             }
             }
+            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
-            } else {
-                allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
             }
             }
+            allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeTransactionPayload) payload);
         } else if (payload instanceof PurgeTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeTransactionPayload) payload);
-            } else {
-                allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
             }
             }
+            allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
-            } else {
-                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof CreateLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CreateLocalHistoryPayload)payload);
         } else if (payload instanceof CreateLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CreateLocalHistoryPayload)payload);
-            } else {
-                allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
-            } else {
-                allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
@@ -511,12 +523,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
         return ret;
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+            @Nullable final Runnable callback) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+        } else if (callback != null) {
+            callback.run();
         }
 
         return chain;
         }
 
         return chain;
@@ -527,7 +542,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
             return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
@@ -536,13 +551,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     .newModification());
         }
 
                     .newModification());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
     }
 
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
     }
 
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
-        treeChangeListenerPublisher.publishChanges(candidate, logContext);
-        dataChangeListenerPublisher.publishChanges(candidate, logContext);
+        treeChangeListenerPublisher.publishChanges(candidate);
+        dataChangeListenerPublisher.publishChanges(candidate);
     }
 
     /**
     }
 
     /**
@@ -701,8 +716,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
         return ret;
     }
 
+    /**
+     * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+     */
+    void resumeNextPendingTransaction() {
+        LOG.debug("{}: attempting to resume transaction processing", logContext);
+        processNextPending();
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
+        ++currentTransactionBatch;
+        if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+            LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+            shard.scheduleNextPendingTransaction();
+            return;
+        }
+
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
@@ -949,7 +979,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return createReadyCohort(txId, mod);
         }
 
             return createReadyCohort(txId, mod);
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod);
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")