BUG-7033: Fix commit exception due to pipe-lining 62/49762/3
authorTom Pantelis <tpanteli@brocade.com>
Thu, 22 Dec 2016 21:57:22 +0000 (16:57 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 29 Dec 2016 09:48:03 +0000 (09:48 +0000)
The DistributedDataStoreRemotingIntegrationTest#testTransactionWithIsolatedLeader
has failed sporadically on commit with the "Store tree X and candidate
base Y differ" error due to an edge case bug with the pipe-lining. Basically
this occurs if tx 1 is pending completion of replication and tx 2 is progressed
to the COMMIT_PENDING state but the associated DataTreeCandidate has
ModificationType.UNMODIFIED. In that case we elide replication and proceed
immdiately to finishCommit which results in the error due to tx 2 committing
before tx 1. To fix it, I added a new FINISH_COMMIT_PENDING state and call
payloadReplicationComplete, which checks the head of the pendingFinishCommits
queue, when replication is elided..

Change-Id: I5a0d033df131c9c3f4e24670a02a971dec331a4d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java

index 31198b9..78b49a6 100644 (file)
@@ -36,6 +36,7 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -380,7 +381,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
-            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+            LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
             return;
         }
@@ -560,21 +561,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
-        while (!pendingTransactions.isEmpty()) {
-            final CommitEntry entry = pendingTransactions.peek();
+        processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
 
-            if (cohort.isFailed()) {
-                LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
-                pendingTransactions.remove();
-                continue;
-            }
-
-            if (cohort.getState() != State.CAN_COMMIT_PENDING) {
-                break;
-            }
-
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
@@ -603,24 +593,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
             // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
             pendingTransactions.poll().cohort.failedCanCommit(cause);
-        }
+        });
+    }
 
-        maybeRunOperationOnPendingTransactionsComplete();
+    private void processNextPending() {
+        processNextPendingFinishCommit();
+        processNextPendingCommit();
+        processNextPendingTransaction();
     }
 
-    private void processNextPendingCommit() {
-        while (!pendingCommits.isEmpty()) {
-            final CommitEntry entry = pendingCommits.peek();
+    private void processNextPending(Queue<CommitEntry> queue, State allowedState, Consumer<CommitEntry> processor) {
+        while (!queue.isEmpty()) {
+            final CommitEntry entry = queue.peek();
             final SimpleShardDataTreeCohort cohort = entry.cohort;
 
             if (cohort.isFailed()) {
                 LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
-                pendingCommits.remove();
+                queue.remove();
                 continue;
             }
 
-            if (cohort.getState() == State.COMMIT_PENDING) {
-                startCommit(cohort, cohort.getCandidate());
+            if (cohort.getState() == allowedState) {
+                processor.accept(entry);
             }
 
             break;
@@ -629,16 +623,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         maybeRunOperationOnPendingTransactionsComplete();
     }
 
+    private void processNextPendingCommit() {
+        processNextPending(pendingCommits, State.COMMIT_PENDING,
+            entry -> startCommit(entry.cohort, entry.cohort.getCandidate()));
+    }
+
+    private void processNextPendingFinishCommit() {
+        processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING,
+            entry -> payloadReplicationComplete(entry.cohort.getIdentifier()));
+    }
+
     private boolean peekNextPendingCommit() {
         final CommitEntry first = pendingCommits.peek();
         return first != null && first.cohort.getState() == State.COMMIT_PENDING;
     }
 
-    private void processNextPending() {
-        processNextPendingCommit();
-        processNextPendingTransaction();
-    }
-
     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
         final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
         if (!cohort.equals(current)) {
@@ -739,15 +738,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
 
+        final TransactionIdentifier txId = cohort.getIdentifier();
         if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
             LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
             pendingCommits.remove();
             pendingFinishCommits.add(entry);
-            finishCommit(cohort);
+            cohort.finishCommitPending();
+            payloadReplicationComplete(txId);
             return;
         }
 
-        final TransactionIdentifier txId = cohort.getIdentifier();
         final Payload payload;
         try {
             payload = CommitTransactionPayload.create(txId, candidate);
index 8d947e8..197c90a 100644 (file)
@@ -212,6 +212,12 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         switchState(State.FAILED).onFailure(cause);
     }
 
+    void finishCommitPending() {
+        checkState(State.COMMIT_PENDING);
+        // We want to switch the state but keep the callback.
+        callback = switchState(State.FINISH_COMMIT_PENDING);
+    }
+
     @Override
     public State getState() {
         return state;
index 6f0b32b..9c7442b 100644 (file)
@@ -346,6 +346,39 @@ public class ShardDataTreeTest extends AbstractTest {
         assertEquals("Car node", carNode, optional.get());
     }
 
+    @Test
+    public void testPipelinedTransactionsWithUnmodifiedCandidate() throws Exception {
+        doReturn(false).when(mockShard).canSkipPayload();
+
+        final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
+            snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
+
+        final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
+            snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
+
+        final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
+
+        verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), eq(false));
+
+        final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
+
+        verify(mockShard, never()).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
+                anyBoolean());
+
+        // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
+        shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(),
+                CommitTransactionPayload.create(nextTransactionId(), cohort1.getCandidate()));
+
+        InOrder inOrder = inOrder(commitCallback1, commitCallback2);
+        inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
+        inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
+
+        final DataTreeSnapshot snapshot =
+                shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
+        Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(CarsModel.BASE_PATH);
+        assertEquals("Car node present", true, optional.isPresent());
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testAbortWithPendingCommits() throws Exception {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.