BUG-8792: allow transactions to not time out after reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 1460982bf3d4010fa4558074033dd3df7b510033..37e65f6a96e5130e53af66372ebb73937cf31332 100644 (file)
@@ -14,7 +14,6 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.base.Ticker;
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -36,6 +35,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -44,6 +44,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
@@ -106,8 +107,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
-    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    /**
+     * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later
+     * execution to finish up the batch. This is necessary in case of a long list of transactions which progress
+     * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could
+     * result in StackOverflowError.
+     */
+    private static final int MAX_TRANSACTION_BATCH = 100;
 
+    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
@@ -136,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private SchemaContext schemaContext;
 
+    private int currentTransactionBatch;
+
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
@@ -163,15 +173,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
+                new DefaultShardDataTreeChangeListenerPublisher(""),
+                new DefaultShardDataChangeListenerPublisher(""), "");
     }
 
     final String logContext() {
         return logContext;
     }
 
-    final Ticker ticker() {
-        return shard.ticker();
+    final long readTime() {
+        return shard.ticker().read();
     }
 
     public TipProducingDataTree getDataTree() {
@@ -187,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
+    void resetTransactionBatch() {
+        currentTransactionBatch = 0;
+    }
+
     /**
      * Take a snapshot of current state for later recovery.
      *
@@ -252,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         dataTree.commit(candidate);
         notifyListeners(candidate);
 
-        LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+        LOG.debug("{}: state snapshot applied in {}", logContext, elapsed);
     }
 
     /**
@@ -378,45 +393,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
+            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                applyReplicatedCandidate(e.getKey(), e.getValue());
-                allMetadataCommittedTransaction(e.getKey());
+                txId = e.getKey();
+                applyReplicatedCandidate(txId, e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                payloadReplicationComplete((TransactionIdentifier) identifier);
+                txId = (TransactionIdentifier) identifier;
+                payloadReplicationComplete(txId);
             }
+            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
-            } else {
-                allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
             }
+            allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeTransactionPayload) payload);
-            } else {
-                allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
             }
+            allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
-            } else {
-                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
             }
+            allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof CreateLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CreateLocalHistoryPayload)payload);
-            } else {
-                allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
             }
+            allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
-            } else {
-                allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
             }
+            allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
@@ -508,12 +521,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+            @Nullable final Runnable callback) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+        } else if (callback != null) {
+            callback.run();
         }
 
         return chain;
@@ -524,7 +540,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
@@ -533,13 +549,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     .newModification());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
     }
 
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
-        treeChangeListenerPublisher.publishChanges(candidate, logContext);
-        dataChangeListenerPublisher.publishChanges(candidate, logContext);
+        treeChangeListenerPublisher.publishChanges(candidate);
+        dataChangeListenerPublisher.publishChanges(candidate);
     }
 
     /**
@@ -658,24 +674,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return dataTree.takeSnapshot().newModification();
     }
 
-    /**
-     * Commits a modification.
-     *
-     * @deprecated This method violates DataTree containment and will be removed.
-     */
-    @VisibleForTesting
-    @Deprecated
-    public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
-        // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
-        Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
-
-        modification.ready();
-        dataTree.validate(modification);
-        DataTreeCandidate candidate = dataTree.prepare(modification);
-        dataTree.commit(candidate);
-        return candidate;
-    }
-
     public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
         Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
 
@@ -698,8 +696,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
+    /**
+     * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
+     */
+    void resumeNextPendingTransaction() {
+        LOG.debug("{}: attempting to resume transaction processing", logContext);
+        processNextPending();
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
+        ++currentTransactionBatch;
+        if (currentTransactionBatch > MAX_TRANSACTION_BATCH) {
+            LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch);
+            shard.scheduleNextPendingTransaction();
+            return;
+        }
+
         processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
@@ -707,12 +720,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
-                cohort.throwCanCommitFailure();
-
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
-                entry.lastAccess = ticker().read();
+                entry.lastAccess = readTime();
                 return;
             } catch (ConflictingModificationAppliedException e) {
                 LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
@@ -775,8 +786,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
-        final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
-        if (!cohort.equals(current)) {
+        final CommitEntry head = pendingTransactions.peek();
+        if (head == null) {
+            LOG.warn("{}: No transactions enqueued while attempting to start canCommit on {}", logContext, cohort);
+            return;
+        }
+        if (!cohort.equals(head.cohort)) {
             LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
             return;
         }
@@ -812,7 +827,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         // Set the tip of the data tree.
         tip = Verify.verifyNotNull(candidate);
 
-        entry.lastAccess = ticker().read();
+        entry.lastAccess = readTime();
 
         pendingTransactions.remove();
         pendingCommits.add(entry);
@@ -926,17 +941,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @Override
     ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Exception failure) {
-        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
-        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+        final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
+        pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
-            final DataTreeModification mod) {
-        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
-        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+        pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
     }
 
@@ -947,72 +961,107 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return createReadyCohort(txId, mod);
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod);
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
-    void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
+    void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+            final Function<SimpleShardDataTreeCohort, Optional<Long>> accessTimeUpdater) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
-        final long now = ticker().read();
+        final long now = readTime();
 
         final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
             !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
         final CommitEntry currentTx = currentQueue.peek();
-        if (currentTx != null && currentTx.lastAccess + timeout < now) {
-            LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
-                    currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
-            boolean processNext = true;
-            switch (currentTx.cohort.getState()) {
-                case CAN_COMMIT_PENDING:
-                    currentQueue.remove().cohort.failedCanCommit(new TimeoutException());
-                    break;
-                case CAN_COMMIT_COMPLETE:
-                    // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
-                    // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
-                    // in PRE_COMMIT_COMPLETE is changed.
-                    currentQueue.remove().cohort.reportFailure(new TimeoutException());
-                    break;
-                case PRE_COMMIT_PENDING:
-                    currentQueue.remove().cohort.failedPreCommit(new TimeoutException());
-                    break;
-                case PRE_COMMIT_COMPLETE:
-                    // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
-                    //        are ready we should commit the transaction, not abort it. Our current software stack does
-                    //        not allow us to do that consistently, because we persist at the time of commit, hence
-                    //        we can end up in a state where we have pre-committed a transaction, then a leader failover
-                    //        occurred ... the new leader does not see the pre-committed transaction and does not have
-                    //        a running timer. To fix this we really need two persistence events.
-                    //
-                    //        The first one, done at pre-commit time will hold the transaction payload. When consensus
-                    //        is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
-                    //        apply the state in this event.
-                    //
-                    //        The second one, done at commit (or abort) time holds only the transaction identifier and
-                    //        signals to followers that the state should (or should not) be applied.
-                    //
-                    //        In order to make the pre-commit timer working across failovers, though, we need
-                    //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
-                    //        restart the timer.
-                    currentQueue.remove().cohort.reportFailure(new TimeoutException());
-                    break;
-                case COMMIT_PENDING:
-                    LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
-                        currentTx.cohort.getIdentifier());
-                    currentTx.lastAccess = now;
-                    processNext = false;
-                    return;
-                case ABORTED:
-                case COMMITTED:
-                case FAILED:
-                case READY:
-                default:
-                    currentQueue.remove();
+        if (currentTx == null) {
+            // Empty queue, no-op
+            return;
+        }
+
+        long delta = now - currentTx.lastAccess;
+        if (delta < timeout) {
+            // Not expired yet, bail
+            return;
+        }
+
+        final Optional<Long> updateOpt = accessTimeUpdater.apply(currentTx.cohort);
+        if (updateOpt.isPresent()) {
+            final long newAccess =  updateOpt.get().longValue();
+            final long newDelta = now - newAccess;
+            if (newDelta < delta) {
+                LOG.debug("{}: Updated current transaction {} access time", logContext,
+                    currentTx.cohort.getIdentifier());
+                currentTx.lastAccess = newAccess;
+                delta = newDelta;
             }
 
-            if (processNext) {
-                processNextPending();
+            if (delta < timeout) {
+                // Not expired yet, bail
+                return;
             }
         }
+
+        final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta);
+        final State state = currentTx.cohort.getState();
+
+        LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+            currentTx.cohort.getIdentifier(), deltaMillis, state);
+        boolean processNext = true;
+        final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+                + deltaMillis + "ms");
+
+        switch (state) {
+            case CAN_COMMIT_PENDING:
+                currentQueue.remove().cohort.failedCanCommit(cohortFailure);
+                break;
+            case CAN_COMMIT_COMPLETE:
+                // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+                // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+                // in PRE_COMMIT_COMPLETE is changed.
+                currentQueue.remove().cohort.reportFailure(cohortFailure);
+                break;
+            case PRE_COMMIT_PENDING:
+                currentQueue.remove().cohort.failedPreCommit(cohortFailure);
+                break;
+            case PRE_COMMIT_COMPLETE:
+                // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+                //        are ready we should commit the transaction, not abort it. Our current software stack does
+                //        not allow us to do that consistently, because we persist at the time of commit, hence
+                //        we can end up in a state where we have pre-committed a transaction, then a leader failover
+                //        occurred ... the new leader does not see the pre-committed transaction and does not have
+                //        a running timer. To fix this we really need two persistence events.
+                //
+                //        The first one, done at pre-commit time will hold the transaction payload. When consensus
+                //        is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+                //        apply the state in this event.
+                //
+                //        The second one, done at commit (or abort) time holds only the transaction identifier and
+                //        signals to followers that the state should (or should not) be applied.
+                //
+                //        In order to make the pre-commit timer working across failovers, though, we need
+                //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+                //        restart the timer.
+                currentQueue.remove().cohort.reportFailure(cohortFailure);
+                break;
+            case COMMIT_PENDING:
+                LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+                    currentTx.cohort.getIdentifier());
+                currentTx.lastAccess = now;
+                processNext = false;
+                return;
+            case READY:
+                currentQueue.remove().cohort.reportFailure(cohortFailure);
+                break;
+            case ABORTED:
+            case COMMITTED:
+            case FAILED:
+            default:
+                currentQueue.remove();
+        }
+
+        if (processNext) {
+            processNextPending();
+        }
     }
 
     boolean startAbort(final SimpleShardDataTreeCohort cohort) {
@@ -1110,4 +1159,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             runOnPendingTransactionsComplete = null;
         }
     }
+
+    ShardStats getStats() {
+        return shard.getShardMBean();
+    }
+
+    Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+        return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
+            e -> e.cohort).iterator();
+    }
+
+    void removeTransactionChain(final LocalHistoryIdentifier id) {
+        if (transactionChains.remove(id) != null) {
+            LOG.debug("{}: Removed transaction chain {}", logContext, id);
+        }
+    }
 }