Fix shard deadlock in 3 nodes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 6d9f78aeabdf82e0e6d3a530b9bc7e273db5087e..5f1e67e988e60ccca5f434f2e04358cd484ba8b9 100644 (file)
@@ -27,11 +27,14 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -100,6 +103,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             this.cohort = Preconditions.checkNotNull(cohort);
             lastAccess = now;
         }
+
+        @Override
+        public String toString() {
+            return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+        }
     }
 
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
@@ -115,7 +123,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
-    private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
+    private final Deque<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
 
@@ -643,11 +651,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
-        return createReadyCohort(transaction.getIdentifier(), snapshot);
+        return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
     }
 
     void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
@@ -786,13 +795,108 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
         if (!cohort.equals(head.cohort)) {
-            LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
-            return;
+            // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this
+            // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx
+            // has other participating shards, it could deadlock with other tx's accessing the same shards
+            // depending on the order the tx's are readied on each shard
+            // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating
+            // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx
+            // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock
+            // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since
+            // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of
+            // the queue as a result, then proceed with canCommit.
+
+            Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
+            if (precedingShardNames.isEmpty()) {
+                LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+                return;
+            }
+
+            LOG.debug("{}: Evaluating tx {} for canCommit -  preceding participating shard names {}",
+                    logContext, cohort.getIdentifier(), precedingShardNames);
+            final Iterator<CommitEntry> iter = pendingTransactions.iterator();
+            int index = -1;
+            int moveToIndex = -1;
+            while (iter.hasNext()) {
+                final CommitEntry entry = iter.next();
+                ++index;
+
+                if (cohort.equals(entry.cohort)) {
+                    if (moveToIndex < 0) {
+                        LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
+                                logContext, cohort.getIdentifier());
+                        return;
+                    }
+
+                    LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
+                            logContext, cohort.getIdentifier(), moveToIndex);
+                    iter.remove();
+                    insertEntry(pendingTransactions, entry, moveToIndex);
+
+                    if (!cohort.equals(pendingTransactions.peek().cohort)) {
+                        LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
+                                logContext, cohort.getIdentifier());
+                        return;
+                    }
+
+                    LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
+                            logContext, cohort.getIdentifier());
+                    break;
+                }
+
+                if (entry.cohort.getState() != State.READY) {
+                    LOG.debug("{}: Skipping pending transaction {} in state {}",
+                            logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+                    continue;
+                }
+
+                final Collection<String> pendingPrecedingShardNames = extractPrecedingShardNames(
+                        entry.cohort.getParticipatingShardNames());
+
+                if (precedingShardNames.equals(pendingPrecedingShardNames)) {
+                    if (moveToIndex < 0) {
+                        LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
+                                logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+                        moveToIndex = index;
+                    } else {
+                        LOG.debug(
+                            "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
+                            logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+                    }
+                } else {
+                    LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
+                        logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+                }
+            }
         }
 
         processNextPendingTransaction();
     }
 
+    private void insertEntry(Deque<CommitEntry> queue, CommitEntry entry, int atIndex) {
+        if (atIndex == 0) {
+            queue.addFirst(entry);
+            return;
+        }
+
+        LOG.trace("Inserting into Deque at index {}", atIndex);
+
+        Deque<CommitEntry> tempStack = new ArrayDeque<>(atIndex);
+        for (int i = 0; i < atIndex; i++) {
+            tempStack.push(queue.poll());
+        }
+
+        queue.addFirst(entry);
+
+        tempStack.forEach(queue::addFirst);
+    }
+
+    private Collection<String> extractPrecedingShardNames(
+            java.util.Optional<SortedSet<String>> participatingShardNames) {
+        return participatingShardNames.map((Function<SortedSet<String>, Collection<String>>)
+            set -> set.headSet(shard.getShardName())).orElse(Collections.<String>emptyList());
+    }
+
     private void failPreCommit(final Throwable cause) {
         shard.getShardMBean().incrementFailedTransactionsCount();
         pendingTransactions.poll().cohort.failedPreCommit(cause);
@@ -950,22 +1054,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
-                        COMMIT_STEP_TIMEOUT));
+                        COMMIT_STEP_TIMEOUT), participatingShardNames);
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
     }
 
     // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
     // the newReadWriteTransaction()
-    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         if (txId.getHistoryId().getHistoryId() == 0) {
-            return createReadyCohort(txId, mod);
+            return createReadyCohort(txId, mod, participatingShardNames);
         }
 
-        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")