BUG-7033: Fix commit exception due to pipe-lining
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 89c381a4ca60ef772f3bef5bc9e73d099828787d..78b49a60ae5cef444b28633fd9b8fe397d117914 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -18,6 +19,7 @@ import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Iterables;
 import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
@@ -34,6 +36,7 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -102,6 +105,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
+    private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
+    private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final Collection<ShardDataTreeMetadata<?>> metadata;
@@ -191,11 +196,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
     }
 
+    private boolean anyPendingTransactions() {
+        return !pendingTransactions.isEmpty() || !pendingCommits.isEmpty() || !pendingFinishCommits.isEmpty();
+    }
+
     private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
             final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
         final Stopwatch elapsed = Stopwatch.createStarted();
 
-        if (!pendingTransactions.isEmpty()) {
+        if (anyPendingTransactions()) {
             LOG.warn("{}: applying state snapshot with pending transactions", logContext);
         }
 
@@ -365,14 +374,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     private void payloadReplicationComplete(final TransactionIdentifier txId) {
-        final CommitEntry current = pendingTransactions.peek();
+        final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
             return;
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
-            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+            LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
             return;
         }
@@ -481,7 +490,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     int getQueueSize() {
-        return pendingTransactions.size();
+        return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size();
     }
 
     @Override
@@ -529,27 +538,33 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
-        Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
+        Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
+
+        for (CommitEntry entry: pendingFinishCommits) {
+            ret.add(entry.cohort);
+        }
+
+        for (CommitEntry entry: pendingCommits) {
+            ret.add(entry.cohort);
+        }
+
         for (CommitEntry entry: pendingTransactions) {
             ret.add(entry.cohort);
         }
 
+        pendingFinishCommits.clear();
+        pendingCommits.clear();
         pendingTransactions.clear();
         tip = dataTree;
         return ret;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void processNextTransaction() {
-        while (!pendingTransactions.isEmpty()) {
-            final CommitEntry entry = pendingTransactions.peek();
+    private void processNextPendingTransaction() {
+        processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
 
-            if (cohort.getState() != State.CAN_COMMIT_PENDING) {
-                break;
-            }
-
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
@@ -578,11 +593,51 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
             // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
             pendingTransactions.poll().cohort.failedCanCommit(cause);
+        });
+    }
+
+    private void processNextPending() {
+        processNextPendingFinishCommit();
+        processNextPendingCommit();
+        processNextPendingTransaction();
+    }
+
+    private void processNextPending(Queue<CommitEntry> queue, State allowedState, Consumer<CommitEntry> processor) {
+        while (!queue.isEmpty()) {
+            final CommitEntry entry = queue.peek();
+            final SimpleShardDataTreeCohort cohort = entry.cohort;
+
+            if (cohort.isFailed()) {
+                LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
+                queue.remove();
+                continue;
+            }
+
+            if (cohort.getState() == allowedState) {
+                processor.accept(entry);
+            }
+
+            break;
         }
 
         maybeRunOperationOnPendingTransactionsComplete();
     }
 
+    private void processNextPendingCommit() {
+        processNextPending(pendingCommits, State.COMMIT_PENDING,
+            entry -> startCommit(entry.cohort, entry.cohort.getCandidate()));
+    }
+
+    private void processNextPendingFinishCommit() {
+        processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING,
+            entry -> payloadReplicationComplete(entry.cohort.getIdentifier()));
+    }
+
+    private boolean peekNextPendingCommit() {
+        final CommitEntry first = pendingCommits.peek();
+        return first != null && first.cohort.getState() == State.COMMIT_PENDING;
+    }
+
     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
         final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
         if (!cohort.equals(current)) {
@@ -590,13 +645,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        processNextTransaction();
+        processNextPendingTransaction();
     }
 
     private void failPreCommit(final Exception cause) {
         shard.getShardMBean().incrementFailedTransactionsCount();
         pendingTransactions.poll().cohort.failedPreCommit(cause);
-        processNextTransaction();
+        processNextPendingTransaction();
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
@@ -606,17 +661,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         final SimpleShardDataTreeCohort current = entry.cohort;
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+
+        LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+
         final DataTreeCandidateTip candidate;
         try {
             candidate = tip.prepare(cohort.getDataTreeModification());
-        } catch (Exception e) {
-            failPreCommit(e);
-            return;
-        }
-
-        try {
             cohort.userPreCommit(candidate);
-        } catch (ExecutionException | TimeoutException e) {
+        } catch (ExecutionException | TimeoutException | RuntimeException e) {
             failPreCommit(e);
             return;
         }
@@ -625,13 +677,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         tip = Verify.verifyNotNull(candidate);
 
         entry.lastAccess = shard.ticker().read();
+
+        pendingTransactions.remove();
+        pendingCommits.add(entry);
+
+        LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+
         cohort.successfulPreCommit(candidate);
+
+        processNextPendingTransaction();
     }
 
     private void failCommit(final Exception cause) {
         shard.getShardMBean().incrementFailedTransactionsCount();
-        pendingTransactions.poll().cohort.failedCommit(cause);
-        processNextTransaction();
+        pendingFinishCommits.poll().cohort.failedCommit(cause);
+        processNextPending();
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
@@ -641,6 +701,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
 
+        if (tip == candidate) {
+            // All pending candidates have been committed, reset the tip to the data tree.
+            tip = dataTree;
+        }
+
         try {
             dataTree.commit(candidate);
         } catch (Exception e) {
@@ -649,50 +714,78 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        // All pending candidates have been committed, reset the tip to the data tree
-        if (tip == candidate) {
-            tip = dataTree;
-        }
-
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
         // FIXME: propagate journal index
-        pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+        pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
 
         LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
         notifyListeners(candidate);
 
-        processNextTransaction();
+        processNextPending();
     }
 
     void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
-        final CommitEntry entry = pendingTransactions.peek();
+        final CommitEntry entry = pendingCommits.peek();
         Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
 
         final SimpleShardDataTreeCohort current = entry.cohort;
-        Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current);
+        if (!cohort.equals(current)) {
+            LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier());
+            return;
+        }
+
+        LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
 
+        final TransactionIdentifier txId = cohort.getIdentifier();
         if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
             LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
-            finishCommit(cohort);
+            pendingCommits.remove();
+            pendingFinishCommits.add(entry);
+            cohort.finishCommitPending();
+            payloadReplicationComplete(txId);
             return;
         }
 
-        final TransactionIdentifier txId = cohort.getIdentifier();
         final Payload payload;
         try {
             payload = CommitTransactionPayload.create(txId, candidate);
         } catch (IOException e) {
             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
-            pendingTransactions.poll().cohort.failedCommit(e);
+            pendingCommits.poll().cohort.failedCommit(e);
+            processNextPending();
             return;
         }
 
+        // We process next transactions pending canCommit before we call persistPayload to possibly progress subsequent
+        // transactions to the COMMIT_PENDING state so the payloads can be batched for replication. This is done for
+        // single-shard transactions that immediately transition from canCommit to preCommit to commit. Note that
+        // if the next pending transaction is progressed to COMMIT_PENDING and this method (startCommit) is called,
+        // the next transaction will not attempt to replicate b/c the current transaction is still at the head of the
+        // pendingCommits queue.
+        processNextPendingTransaction();
+
+        // After processing next pending transactions, we can now remove the current transaction from pendingCommits.
+        // Note this must be done before the call to peekNextPendingCommit below so we check the next transaction
+        // in order to properly determine the batchHint flag for the call to persistPayload.
+        pendingCommits.remove();
+        pendingFinishCommits.add(entry);
+
+        // See if the next transaction is pending commit (ie in the COMMIT_PENDING state) so it can be batched with
+        // this transaction for replication.
+        boolean replicationBatchHint = peekNextPendingCommit();
+
         // Once completed, we will continue via payloadReplicationComplete
+        shard.persistPayload(txId, payload, replicationBatchHint);
+
         entry.lastAccess = shard.ticker().read();
-        shard.persistPayload(txId, payload);
+
         LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+
+        // Process the next transaction pending commit, if any. If there is one it will be batched with this
+        // transaction for replication.
+        processNextPendingCommit();
     }
 
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
@@ -712,23 +805,26 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
         final long now = shard.ticker().read();
-        final CommitEntry currentTx = pendingTransactions.peek();
+
+        final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
+            !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
+        final CommitEntry currentTx = currentQueue.peek();
         if (currentTx != null && currentTx.lastAccess + timeout < now) {
             LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
                     currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
             boolean processNext = true;
             switch (currentTx.cohort.getState()) {
                 case CAN_COMMIT_PENDING:
-                    pendingTransactions.remove().cohort.failedCanCommit(new TimeoutException());
+                    currentQueue.remove().cohort.failedCanCommit(new TimeoutException());
                     break;
                 case CAN_COMMIT_COMPLETE:
                     // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
                     // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
                     // in PRE_COMMIT_COMPLETE is changed.
-                    pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
+                    currentQueue.remove().cohort.reportFailure(new TimeoutException());
                     break;
                 case PRE_COMMIT_PENDING:
-                    pendingTransactions.remove().cohort.failedPreCommit(new TimeoutException());
+                    currentQueue.remove().cohort.failedPreCommit(new TimeoutException());
                     break;
                 case PRE_COMMIT_COMPLETE:
                     // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
@@ -748,7 +844,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     //        In order to make the pre-commit timer working across failovers, though, we need
                     //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
                     //        restart the timer.
-                    pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
+                    currentQueue.remove().cohort.reportFailure(new TimeoutException());
                     break;
                 case COMMIT_PENDING:
                     LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
@@ -761,17 +857,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 case FAILED:
                 case READY:
                 default:
-                    pendingTransactions.remove();
+                    currentQueue.remove();
             }
 
             if (processNext) {
-                processNextTransaction();
+                processNextPending();
             }
         }
     }
 
     boolean startAbort(final SimpleShardDataTreeCohort cohort) {
-        final Iterator<CommitEntry> it = pendingTransactions.iterator();
+        final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
+                pendingTransactions).iterator();
         if (!it.hasNext()) {
             LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
             return true;
@@ -785,8 +882,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     cohort.getIdentifier());
 
                 it.remove();
-                rebasePreCommittedTransactions(it, dataTree);
-                processNextTransaction();
+                if (cohort.getCandidate() != null) {
+                    rebaseTransactions(it, dataTree);
+                }
+
+                processNextPending();
                 return true;
             }
 
@@ -794,16 +894,20 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return false;
         }
 
-        TipProducingDataTreeTip newTip = dataTree;
+        TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
                 LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+
                 it.remove();
-                rebasePreCommittedTransactions(it, newTip);
+                if (cohort.getCandidate() != null) {
+                    rebaseTransactions(it, newTip);
+                }
+
                 return true;
             } else {
-                newTip = cohort.getCandidate();
+                newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip);
             }
         }
 
@@ -812,8 +916,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void rebasePreCommittedTransactions(Iterator<CommitEntry> iter, TipProducingDataTreeTip newTip) {
-        tip = newTip;
+    private void rebaseTransactions(Iterator<CommitEntry> iter, @Nonnull TipProducingDataTreeTip newTip) {
+        tip = Preconditions.checkNotNull(newTip);
         while (iter.hasNext()) {
             final SimpleShardDataTreeCohort cohort = iter.next().cohort;
             if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
@@ -849,7 +953,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     private void maybeRunOperationOnPendingTransactionsComplete() {
-        if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+        if (runOnPendingTransactionsComplete != null && !anyPendingTransactions()) {
             LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
                     runOnPendingTransactionsComplete);