From: Tom Pantelis Date: Thu, 22 Dec 2016 21:57:22 +0000 (-0500) Subject: BUG-7033: Fix commit exception due to pipe-lining X-Git-Tag: release/carbon~340 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=823bd74f34ee1c651f1f90daeef386a35c68d431;hp=3ed71a2888cb0e38096d9bf15b78213948d1f328 BUG-7033: Fix commit exception due to pipe-lining The DistributedDataStoreRemotingIntegrationTest#testTransactionWithIsolatedLeader has failed sporadically on commit with the "Store tree X and candidate base Y differ" error due to an edge case bug with the pipe-lining. Basically this occurs if tx 1 is pending completion of replication and tx 2 is progressed to the COMMIT_PENDING state but the associated DataTreeCandidate has ModificationType.UNMODIFIED. In that case we elide replication and proceed immdiately to finishCommit which results in the error due to tx 2 committing before tx 1. To fix it, I added a new FINISH_COMMIT_PENDING state and call payloadReplicationComplete, which checks the head of the pendingFinishCommits queue, when replication is elided.. Change-Id: I5a0d033df131c9c3f4e24670a02a971dec331a4d Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 31198b98f2..78b49a60ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -36,6 +36,7 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; @@ -380,7 +381,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } if (!current.cohort.getIdentifier().equals(txId)) { - LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext, + LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); return; } @@ -560,21 +561,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { - while (!pendingTransactions.isEmpty()) { - final CommitEntry entry = pendingTransactions.peek(); + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); - if (cohort.isFailed()) { - LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); - pendingTransactions.remove(); - continue; - } - - if (cohort.getState() != State.CAN_COMMIT_PENDING) { - break; - } - LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { @@ -603,24 +593,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one pendingTransactions.poll().cohort.failedCanCommit(cause); - } + }); + } - maybeRunOperationOnPendingTransactionsComplete(); + private void processNextPending() { + processNextPendingFinishCommit(); + processNextPendingCommit(); + processNextPendingTransaction(); } - private void processNextPendingCommit() { - while (!pendingCommits.isEmpty()) { - final CommitEntry entry = pendingCommits.peek(); + private void processNextPending(Queue queue, State allowedState, Consumer processor) { + while (!queue.isEmpty()) { + final CommitEntry entry = queue.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; if (cohort.isFailed()) { LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); - pendingCommits.remove(); + queue.remove(); continue; } - if (cohort.getState() == State.COMMIT_PENDING) { - startCommit(cohort, cohort.getCandidate()); + if (cohort.getState() == allowedState) { + processor.accept(entry); } break; @@ -629,16 +623,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { maybeRunOperationOnPendingTransactionsComplete(); } + private void processNextPendingCommit() { + processNextPending(pendingCommits, State.COMMIT_PENDING, + entry -> startCommit(entry.cohort, entry.cohort.getCandidate())); + } + + private void processNextPendingFinishCommit() { + processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING, + entry -> payloadReplicationComplete(entry.cohort.getIdentifier())); + } + private boolean peekNextPendingCommit() { final CommitEntry first = pendingCommits.peek(); return first != null && first.cohort.getState() == State.COMMIT_PENDING; } - private void processNextPending() { - processNextPendingCommit(); - processNextPendingTransaction(); - } - void startCanCommit(final SimpleShardDataTreeCohort cohort) { final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort; if (!cohort.equals(current)) { @@ -739,15 +738,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier()); + final TransactionIdentifier txId = cohort.getIdentifier(); if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) { LOG.debug("{}: No replication required, proceeding to finish commit", logContext); pendingCommits.remove(); pendingFinishCommits.add(entry); - finishCommit(cohort); + cohort.finishCommitPending(); + payloadReplicationComplete(txId); return; } - final TransactionIdentifier txId = cohort.getIdentifier(); final Payload payload; try { payload = CommitTransactionPayload.create(txId, candidate); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index 6cb9badd8d..deec9e5adc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -25,6 +25,7 @@ public abstract class ShardDataTreeCohort implements Identifiable + snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer())); + + final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot -> + snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer())); + + final FutureCallback commitCallback1 = immediate3PhaseCommit(cohort1); + + verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), eq(false)); + + final FutureCallback commitCallback2 = immediate3PhaseCommit(cohort2); + + verify(mockShard, never()).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class), + anyBoolean()); + + // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload. + shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), + CommitTransactionPayload.create(nextTransactionId(), cohort1.getCandidate())); + + InOrder inOrder = inOrder(commitCallback1, commitCallback2); + inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class)); + inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class)); + + final DataTreeSnapshot snapshot = + shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot(); + Optional> optional = snapshot.readNode(CarsModel.BASE_PATH); + assertEquals("Car node present", true, optional.isPresent()); + } + @SuppressWarnings("unchecked") @Test public void testAbortWithPendingCommits() throws Exception {