BUG-5280: update transaction statistics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 1460982bf3d4010fa4558074033dd3df7b510033..8f3a467d55de9cb739b6d24a7e5db6cfa34910cb 100644 (file)
@@ -44,6 +44,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
@@ -106,8 +107,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
-    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    /**
+     * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+     * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+     * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+     * result in StackOverflowError.
+     */
+    private static final int MAX_TRANSACTION_BATCH = 100;
 
 
+    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
@@ -136,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private SchemaContext schemaContext;
 
 
     private SchemaContext schemaContext;
 
+    private int currentTransactionBatch;
+
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
@@ -163,7 +173,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
+                new DefaultShardDataTreeChangeListenerPublisher(""),
+                new DefaultShardDataChangeListenerPublisher(""), "");
     }
 
     final String logContext() {
     }
 
     final String logContext() {
@@ -187,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
+    void resetTransactionBatch() {
+        currentTransactionBatch = 0;
+    }
+
     /**
      * Take a snapshot of current state for later recovery.
      *
     /**
      * Take a snapshot of current state for later recovery.
      *
@@ -252,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         dataTree.commit(candidate);
         notifyListeners(candidate);
 
         dataTree.commit(candidate);
         notifyListeners(candidate);
 
-        LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+        LOG.debug("{}: state snapshot applied in {}", logContext, elapsed);
     }
 
     /**
     }
 
     /**
@@ -378,45 +393,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
+            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                applyReplicatedCandidate(e.getKey(), e.getValue());
-                allMetadataCommittedTransaction(e.getKey());
+                txId = e.getKey();
+                applyReplicatedCandidate(txId, e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                payloadReplicationComplete((TransactionIdentifier) identifier);
+                txId = (TransactionIdentifier) identifier;
+                payloadReplicationComplete(txId);
             }
             }
+            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
-            } else {
-                allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
             }
             }
+            allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeTransactionPayload) payload);
         } else if (payload instanceof PurgeTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeTransactionPayload) payload);
-            } else {
-                allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
             }
             }
+            allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
-            } else {
-                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof CreateLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CreateLocalHistoryPayload)payload);
         } else if (payload instanceof CreateLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CreateLocalHistoryPayload)payload);
-            } else {
-                allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
-            } else {
-                allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
@@ -513,7 +526,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null);
         }
 
         return chain;
         }
 
         return chain;
@@ -538,8 +551,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
 
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
-        treeChangeListenerPublisher.publishChanges(candidate, logContext);
-        dataChangeListenerPublisher.publishChanges(candidate, logContext);
+        treeChangeListenerPublisher.publishChanges(candidate);
+        dataChangeListenerPublisher.publishChanges(candidate);
     }
 
     /**
     }
 
     /**
@@ -698,8 +711,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
         return ret;
     }
 
+    /**
+     * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+     */
+    void resumeNextPendingTransaction() {
+        LOG.debug("{}: attempting to resume transaction processing", logContext);
+        processNextPending();
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
+        ++currentTransactionBatch;
+        if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+            LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+            shard.scheduleNextPendingTransaction();
+            return;
+        }
+
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
@@ -1110,4 +1138,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             runOnPendingTransactionsComplete = null;
         }
     }
             runOnPendingTransactionsComplete = null;
         }
     }
+
+    ShardStats getStats() {
+        return shard.getShardMBean();
+    }
 }
 }