BUG-8618: add pause/unpause mechanics for tell-based protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 694be4d1d16ac1e723359531fac964908bfdc3f3..7aecda48db6ceaf7ca3a5dbc5ae266847b6676ca 100644 (file)
@@ -719,8 +719,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
-                cohort.throwCanCommitFailure();
-
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
@@ -787,8 +785,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
-        final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
-        if (!cohort.equals(current)) {
+        final CommitEntry head = pendingTransactions.peek();
+        if (head == null) {
+            LOG.warn("{}: No transactions enqueued while attempting to start canCommit on {}", logContext, cohort);
+            return;
+        }
+        if (!cohort.equals(head.cohort)) {
             LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
             return;
         }
@@ -938,15 +940,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @Override
     ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Exception failure) {
-        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
+        final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
-            final DataTreeModification mod) {
-        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
@@ -971,21 +972,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
         final CommitEntry currentTx = currentQueue.peek();
         if (currentTx != null && currentTx.lastAccess + timeout < now) {
+            final State state = currentTx.cohort.getState();
             LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
-                    currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
+                    currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, state);
             boolean processNext = true;
-            switch (currentTx.cohort.getState()) {
+            final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+                    + transactionCommitTimeoutMillis + "ms");
+
+            switch (state) {
                 case CAN_COMMIT_PENDING:
-                    currentQueue.remove().cohort.failedCanCommit(new TimeoutException());
+                    currentQueue.remove().cohort.failedCanCommit(cohortFailure);
                     break;
                 case CAN_COMMIT_COMPLETE:
                     // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
                     // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
                     // in PRE_COMMIT_COMPLETE is changed.
-                    currentQueue.remove().cohort.reportFailure(new TimeoutException());
+                    currentQueue.remove().cohort.reportFailure(cohortFailure);
                     break;
                 case PRE_COMMIT_PENDING:
-                    currentQueue.remove().cohort.failedPreCommit(new TimeoutException());
+                    currentQueue.remove().cohort.failedPreCommit(cohortFailure);
                     break;
                 case PRE_COMMIT_COMPLETE:
                     // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
@@ -1005,7 +1010,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     //        In order to make the pre-commit timer working across failovers, though, we need
                     //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
                     //        restart the timer.
-                    currentQueue.remove().cohort.reportFailure(new TimeoutException());
+                    currentQueue.remove().cohort.reportFailure(cohortFailure);
                     break;
                 case COMMIT_PENDING:
                     LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
@@ -1013,10 +1018,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     currentTx.lastAccess = now;
                     processNext = false;
                     return;
+                case READY:
+                    currentQueue.remove().cohort.reportFailure(cohortFailure);
+                    break;
                 case ABORTED:
                 case COMMITTED:
                 case FAILED:
-                case READY:
                 default:
                     currentQueue.remove();
             }
@@ -1126,4 +1133,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardStats getStats() {
         return shard.getShardMBean();
     }
+
+    Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+        return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
+            e -> e.cohort).iterator();
+    }
+
+    void removeTransactionChain(final LocalHistoryIdentifier id) {
+        if (transactionChains.remove(id) != null) {
+            LOG.debug("{}: Removed transaction chain {}", logContext, id);
+        }
+    }
 }