+ if (!cohort.equals(head.cohort)) {
+ // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this
+ // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx
+ // has other participating shards, it could deadlock with other tx's accessing the same shards
+ // depending on the order the tx's are readied on each shard
+ // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating
+ // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx
+ // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock
+ // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since
+ // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of
+ // the queue as a result, then proceed with canCommit.
+
+ Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
+ if (precedingShardNames.isEmpty()) {
+ LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}",
+ logContext, cohort.getIdentifier(), precedingShardNames);
+ final Iterator<CommitEntry> iter = pendingTransactions.iterator();
+ int index = -1;
+ int moveToIndex = -1;
+ while (iter.hasNext()) {
+ final CommitEntry entry = iter.next();
+ ++index;
+
+ if (cohort.equals(entry.cohort)) {
+ if (moveToIndex < 0) {
+ LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
+ logContext, cohort.getIdentifier());
+ return;
+ }
+
+ LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
+ logContext, cohort.getIdentifier(), moveToIndex);
+ iter.remove();
+ insertEntry(pendingTransactions, entry, moveToIndex);
+
+ if (!cohort.equals(pendingTransactions.peek().cohort)) {
+ LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
+ logContext, cohort.getIdentifier());
+ return;
+ }
+
+ LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
+ logContext, cohort.getIdentifier());
+ break;
+ }
+
+ if (entry.cohort.getState() != State.READY) {
+ LOG.debug("{}: Skipping pending transaction {} in state {}",
+ logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+ continue;
+ }
+
+ final Collection<String> pendingPrecedingShardNames = extractPrecedingShardNames(
+ entry.cohort.getParticipatingShardNames());
+
+ if (precedingShardNames.equals(pendingPrecedingShardNames)) {
+ if (moveToIndex < 0) {
+ LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
+ logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+ moveToIndex = index;
+ } else {
+ LOG.debug(
+ "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
+ logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+ }
+ } else {
+ LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
+ logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+ }
+ }
+ }