BUG-8371: Respond to CreateLocalHistoryRequest after replication
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index e8469d439d7df19b6fc5828574492beadf5109f1..ed36813d49a06f94d11678d626183aeb04288a97 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
-import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,6 +44,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
@@ -63,7 +63,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -108,8 +107,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
-    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    /**
+     * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+     * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+     * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+     * result in StackOverflowError.
+     */
+    private static final int MAX_TRANSACTION_BATCH = 100;
 
+    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
@@ -138,7 +144,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private SchemaContext schemaContext;
 
-    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
+    private int currentTransactionBatch;
+
+    ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
             final ShardDataTreeMetadata<?>... metadata) {
@@ -153,18 +161,20 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         tip = dataTree;
     }
 
-    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+    ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
             final YangInstanceIdentifier root,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
-            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
+            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+            final ShardDataTreeMetadata<?>... metadata) {
         this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root),
-                treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
+                treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata);
     }
 
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
+                new DefaultShardDataTreeChangeListenerPublisher(""),
+                new DefaultShardDataChangeListenerPublisher(""), "");
     }
 
     final String logContext() {
@@ -188,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
+    void resetTransactionBatch() {
+        currentTransactionBatch = 0;
+    }
+
     /**
      * Take a snapshot of current state for later recovery.
      *
@@ -253,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         dataTree.commit(candidate);
         notifyListeners(candidate);
 
-        LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+        LOG.debug("{}: state snapshot applied in {}", logContext, elapsed);
     }
 
     /**
@@ -381,51 +395,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
+            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                applyReplicatedCandidate(e.getKey(), e.getValue());
-                allMetadataCommittedTransaction(e.getKey());
+                txId = e.getKey();
+                applyReplicatedCandidate(txId, e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                payloadReplicationComplete((TransactionIdentifier) identifier);
+                txId = (TransactionIdentifier) identifier;
+                payloadReplicationComplete(txId);
             }
+            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
-            } else {
-                allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
             }
+            allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeTransactionPayload) payload);
-            } else {
-                allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
-            }
-        } else if (payload instanceof CloseLocalHistoryPayload) {
-            if (identifier != null) {
-                payloadReplicationComplete((CloseLocalHistoryPayload) payload);
-            } else {
-                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
             }
+            allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
-            } else {
-                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
             }
+            allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof CreateLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CreateLocalHistoryPayload)payload);
-            } else {
-                allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
             }
+            allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
-            } else {
-                allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
             }
+            allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
@@ -517,12 +523,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+            @Nullable final Runnable callback) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+        } else if (callback != null) {
+            callback.run();
         }
 
         return chain;
@@ -533,7 +542,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
@@ -542,32 +551,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     .newModification());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
     }
 
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
-        treeChangeListenerPublisher.publishChanges(candidate, logContext);
-        dataChangeListenerPublisher.publishChanges(candidate, logContext);
-    }
-
-    void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-            NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
-        if (currentState.isPresent()) {
-            ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
-            localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
-                    listenerReg.getScope());
-            localPublisher.publishChanges(currentState.get(), logContext);
-        }
-    }
-
-    void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
-            final Optional<DataTreeCandidate> currentState) {
-        if (currentState.isPresent()) {
-            ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
-            localPublisher.registerTreeChangeListener(path, listener);
-            localPublisher.publishChanges(currentState.get(), logContext);
-        }
+        treeChangeListenerPublisher.publishChanges(candidate);
+        dataChangeListenerPublisher.publishChanges(candidate);
     }
 
     /**
@@ -622,29 +612,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
-    Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-            Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
-                    final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
-                    final DataChangeScope scope) {
-        DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
-                dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
-
-        return new SimpleEntry<>(reg, readCurrentData());
+    void registerDataChangeListener(final YangInstanceIdentifier path,
+            final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+            final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
+            final Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                    onRegistration) {
+        dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration);
     }
 
-    private Optional<DataTreeCandidate> readCurrentData() {
+    Optional<DataTreeCandidate> readCurrentData() {
         final Optional<NormalizedNode<?, ?>> currentState =
                 dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
-    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
-            registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
-        final ListenerRegistration<DOMDataTreeChangeListener> reg =
-                treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
-
-        return new SimpleEntry<>(reg, readCurrentData());
+    public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+            final Optional<DataTreeCandidate> initialState,
+            final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+        treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
     }
 
     int getQueueSize() {
@@ -658,11 +644,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         replicatePayload(id, AbortTransactionPayload.create(id), callback);
     }
 
-
     @Override
-    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
-        LOG.debug("{}: purging transaction {}", logContext, id);
-        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+    void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
+        // No-op for free-standing transactions
+
     }
 
     @Override
@@ -673,6 +658,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return createReadyCohort(transaction.getIdentifier(), snapshot);
     }
 
+    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+        LOG.debug("{}: purging transaction {}", logContext, id);
+        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+    }
+
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
         return dataTree.takeSnapshot().readNode(path);
     }
@@ -726,8 +716,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
+    /**
+     * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+     */
+    void resumeNextPendingTransaction() {
+        LOG.debug("{}: attempting to resume transaction processing", logContext);
+        processNextPending();
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
+        ++currentTransactionBatch;
+        if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+            LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+            shard.scheduleNextPendingTransaction();
+            return;
+        }
+
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
@@ -735,6 +740,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
+                cohort.throwCanCommitFailure();
+
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
@@ -950,14 +957,31 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
-            final DataTreeModification modification) {
-        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final Exception failure) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
+        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+        return cohort;
+    }
+
+    @Override
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
         pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
         return cohort;
     }
 
+    // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
+    // the newReadWriteTransaction()
+    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+        if (txId.getHistoryId().getHistoryId() == 0) {
+            return createReadyCohort(txId, mod);
+        }
+
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
+    }
+
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
@@ -1118,4 +1142,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             runOnPendingTransactionsComplete = null;
         }
     }
+
+    ShardStats getStats() {
+        return shard.getShardMBean();
+    }
 }