BUG-7033: Fix commit exception due to pipe-lining
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 064f6f5d8a476c6451ff9b4bd9bce0c4344549e0..78b49a60ae5cef444b28633fd9b8fe397d117914 100644 (file)
@@ -36,6 +36,7 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -380,7 +381,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
-            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+            LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
             return;
         }
@@ -537,8 +538,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
-        Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size() + pendingCommits.size()
-                + pendingFinishCommits.size());
+        Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
 
         for (CommitEntry entry: pendingFinishCommits) {
             ret.add(entry.cohort);
@@ -561,21 +561,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
-        while (!pendingTransactions.isEmpty()) {
-            final CommitEntry entry = pendingTransactions.peek();
+        processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
 
-            if (cohort.isFailed()) {
-                LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
-                pendingTransactions.remove();
-                continue;
-            }
-
-            if (cohort.getState() != State.CAN_COMMIT_PENDING) {
-                break;
-            }
-
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
@@ -604,24 +593,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
             // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
             pendingTransactions.poll().cohort.failedCanCommit(cause);
-        }
+        });
+    }
 
-        maybeRunOperationOnPendingTransactionsComplete();
+    private void processNextPending() {
+        processNextPendingFinishCommit();
+        processNextPendingCommit();
+        processNextPendingTransaction();
     }
 
-    private void processNextPendingCommit() {
-        while (!pendingCommits.isEmpty()) {
-            final CommitEntry entry = pendingCommits.peek();
+    private void processNextPending(Queue<CommitEntry> queue, State allowedState, Consumer<CommitEntry> processor) {
+        while (!queue.isEmpty()) {
+            final CommitEntry entry = queue.peek();
             final SimpleShardDataTreeCohort cohort = entry.cohort;
 
             if (cohort.isFailed()) {
                 LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
-                pendingCommits.remove();
+                queue.remove();
                 continue;
             }
 
-            if (cohort.getState() == State.COMMIT_PENDING) {
-                startCommit(cohort, cohort.getCandidate());
+            if (cohort.getState() == allowedState) {
+                processor.accept(entry);
             }
 
             break;
@@ -630,9 +623,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         maybeRunOperationOnPendingTransactionsComplete();
     }
 
-    private void processNextPending() {
-        processNextPendingCommit();
-        processNextPendingTransaction();
+    private void processNextPendingCommit() {
+        processNextPending(pendingCommits, State.COMMIT_PENDING,
+            entry -> startCommit(entry.cohort, entry.cohort.getCandidate()));
+    }
+
+    private void processNextPendingFinishCommit() {
+        processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING,
+            entry -> payloadReplicationComplete(entry.cohort.getIdentifier()));
+    }
+
+    private boolean peekNextPendingCommit() {
+        final CommitEntry first = pendingCommits.peek();
+        return first != null && first.cohort.getState() == State.COMMIT_PENDING;
     }
 
     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
@@ -658,17 +661,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         final SimpleShardDataTreeCohort current = entry.cohort;
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+
+        LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+
         final DataTreeCandidateTip candidate;
         try {
             candidate = tip.prepare(cohort.getDataTreeModification());
-        } catch (Exception e) {
-            failPreCommit(e);
-            return;
-        }
-
-        try {
             cohort.userPreCommit(candidate);
-        } catch (ExecutionException | TimeoutException e) {
+        } catch (ExecutionException | TimeoutException | RuntimeException e) {
             failPreCommit(e);
             return;
         }
@@ -680,6 +680,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         pendingTransactions.remove();
         pendingCommits.add(entry);
+
+        LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+
         cohort.successfulPreCommit(candidate);
 
         processNextPendingTransaction();
@@ -733,34 +736,56 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
+
+        final TransactionIdentifier txId = cohort.getIdentifier();
         if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
             LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
             pendingCommits.remove();
             pendingFinishCommits.add(entry);
-            finishCommit(cohort);
+            cohort.finishCommitPending();
+            payloadReplicationComplete(txId);
             return;
         }
 
-        final TransactionIdentifier txId = cohort.getIdentifier();
         final Payload payload;
         try {
             payload = CommitTransactionPayload.create(txId, candidate);
-
-            // Once completed, we will continue via payloadReplicationComplete
-            entry.lastAccess = shard.ticker().read();
-            shard.persistPayload(txId, payload);
-
-            LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
-
-            pendingCommits.remove();
-            pendingFinishCommits.add(entry);
         } catch (IOException e) {
             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
             pendingCommits.poll().cohort.failedCommit(e);
+            processNextPending();
             return;
         }
 
-        processNextPending();
+        // We process next transactions pending canCommit before we call persistPayload to possibly progress subsequent
+        // transactions to the COMMIT_PENDING state so the payloads can be batched for replication. This is done for
+        // single-shard transactions that immediately transition from canCommit to preCommit to commit. Note that
+        // if the next pending transaction is progressed to COMMIT_PENDING and this method (startCommit) is called,
+        // the next transaction will not attempt to replicate b/c the current transaction is still at the head of the
+        // pendingCommits queue.
+        processNextPendingTransaction();
+
+        // After processing next pending transactions, we can now remove the current transaction from pendingCommits.
+        // Note this must be done before the call to peekNextPendingCommit below so we check the next transaction
+        // in order to properly determine the batchHint flag for the call to persistPayload.
+        pendingCommits.remove();
+        pendingFinishCommits.add(entry);
+
+        // See if the next transaction is pending commit (ie in the COMMIT_PENDING state) so it can be batched with
+        // this transaction for replication.
+        boolean replicationBatchHint = peekNextPendingCommit();
+
+        // Once completed, we will continue via payloadReplicationComplete
+        shard.persistPayload(txId, payload, replicationBatchHint);
+
+        entry.lastAccess = shard.ticker().read();
+
+        LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+
+        // Process the next transaction pending commit, if any. If there is one it will be batched with this
+        // transaction for replication.
+        processNextPendingCommit();
     }
 
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
@@ -882,7 +907,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
                 return true;
             } else {
-                newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), dataTree);
+                newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip);
             }
         }