BUG-8159: apply object lifecycle to metadata
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index be4aa42669ff77fb653f59082f664dc85deaf7eb..ae09f1da78394fafd2ae0820b14386519978546b 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
-import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -64,7 +63,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -109,8 +107,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
-    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    /**
+     * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+     * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+     * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+     * result in StackOverflowError.
+     */
+    private static final int MAX_TRANSACTION_BATCH = 100;
 
 
+    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
@@ -139,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private SchemaContext schemaContext;
 
 
     private SchemaContext schemaContext;
 
+    private int currentTransactionBatch;
+
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
@@ -166,7 +173,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
+                new DefaultShardDataTreeChangeListenerPublisher(""),
+                new DefaultShardDataChangeListenerPublisher(""), "");
     }
 
     final String logContext() {
     }
 
     final String logContext() {
@@ -190,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
+    void resetTransactionBatch() {
+        currentTransactionBatch = 0;
+    }
+
     /**
      * Take a snapshot of current state for later recovery.
      *
     /**
      * Take a snapshot of current state for later recovery.
      *
@@ -255,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         dataTree.commit(candidate);
         notifyListeners(candidate);
 
         dataTree.commit(candidate);
         notifyListeners(candidate);
 
-        LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+        LOG.debug("{}: state snapshot applied in {}", logContext, elapsed);
     }
 
     /**
     }
 
     /**
@@ -383,45 +395,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
+            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                applyReplicatedCandidate(e.getKey(), e.getValue());
-                allMetadataCommittedTransaction(e.getKey());
+                txId = e.getKey();
+                applyReplicatedCandidate(txId, e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                payloadReplicationComplete((TransactionIdentifier) identifier);
+                txId = (TransactionIdentifier) identifier;
+                payloadReplicationComplete(txId);
             }
             }
+            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
-            } else {
-                allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
             }
             }
+            allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeTransactionPayload) payload);
         } else if (payload instanceof PurgeTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeTransactionPayload) payload);
-            } else {
-                allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
             }
             }
+            allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
-            } else {
-                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof CreateLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CreateLocalHistoryPayload)payload);
         } else if (payload instanceof CreateLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CreateLocalHistoryPayload)payload);
-            } else {
-                allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
-            } else {
-                allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
             }
             }
+            allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
@@ -518,7 +528,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null);
         }
 
         return chain;
         }
 
         return chain;
@@ -543,27 +553,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
 
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
-        treeChangeListenerPublisher.publishChanges(candidate, logContext);
-        dataChangeListenerPublisher.publishChanges(candidate, logContext);
-    }
-
-    void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-            NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
-        if (currentState.isPresent()) {
-            ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
-            localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
-                    listenerReg.getScope());
-            localPublisher.publishChanges(currentState.get(), logContext);
-        }
-    }
-
-    void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
-            final Optional<DataTreeCandidate> currentState) {
-        if (currentState.isPresent()) {
-            ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
-            localPublisher.registerTreeChangeListener(path, listener);
-            localPublisher.publishChanges(currentState.get(), logContext);
-        }
+        treeChangeListenerPublisher.publishChanges(candidate);
+        dataChangeListenerPublisher.publishChanges(candidate);
     }
 
     /**
     }
 
     /**
@@ -618,29 +609,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
         replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
-    Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-            Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
-                    final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
-                    final DataChangeScope scope) {
-        DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
-                dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
-
-        return new SimpleEntry<>(reg, readCurrentData());
+    void registerDataChangeListener(final YangInstanceIdentifier path,
+            final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+            final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
+            final Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                    onRegistration) {
+        dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration);
     }
 
     }
 
-    private Optional<DataTreeCandidate> readCurrentData() {
+    Optional<DataTreeCandidate> readCurrentData() {
         final Optional<NormalizedNode<?, ?>> currentState =
                 dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
         final Optional<NormalizedNode<?, ?>> currentState =
                 dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
-    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
-            registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
-        final ListenerRegistration<DOMDataTreeChangeListener> reg =
-                treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
-
-        return new SimpleEntry<>(reg, readCurrentData());
+    public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+            final Optional<DataTreeCandidate> initialState,
+            final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+        treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
     }
 
     int getQueueSize() {
     }
 
     int getQueueSize() {
@@ -726,8 +713,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
         return ret;
     }
 
+    /**
+     * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+     */
+    void resumeNextPendingTransaction() {
+        LOG.debug("{}: attempting to resume transaction processing", logContext);
+        processNextPending();
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
+        ++currentTransactionBatch;
+        if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+            LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+            shard.scheduleNextPendingTransaction();
+            return;
+        }
+
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
@@ -951,7 +953,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         cohortRegistry.process(sender, message);
     }
 
         cohortRegistry.process(sender, message);
     }
 
-
     @Override
     ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Exception failure) {
     @Override
     ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Exception failure) {
@@ -968,6 +969,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return cohort;
     }
 
         return cohort;
     }
 
+    // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
+    // the newReadWriteTransaction()
+    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+        if (txId.getHistoryId().getHistoryId() == 0) {
+            return createReadyCohort(txId, mod);
+        }
+
+        return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod);
+    }
+
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);