From: Tom Pantelis Date: Tue, 3 Jan 2017 12:53:08 +0000 (-0500) Subject: BUG-7033: Remove payload replication short-circuits X-Git-Tag: release/carbon~337 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c4f9bb0e408bfaff5c1730e574e8ef1ebe80ac7b BUG-7033: Remove payload replication short-circuits Removed the code paths in ShardDataTree#startCommit whereby it short-circuits the call to persistPayload. This simplifies the code in ShardDataTree. For UNMODIFIED DataTreeCandidates we still want to replicate so followers have an accurate view of transaction states. For the case where persistence is disabled and there are no followers, I modified persistPayload to check those conditions and short-circuit. Change-Id: Ifb940f950b076b5f9cedd91c42d3e065db92a7c5 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 90c47256b4..84c399deb2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -450,16 +450,15 @@ public class Shard extends RaftActor { updateConfigParams(datastoreContext.getShardRaftConfig()); } - boolean canSkipPayload() { - // If we do not have any followers and we are not using persistence we can apply modification to the state - // immediately - return !hasFollowers() && !persistence().isRecoveryApplicable(); - } - // applyState() will be invoked once consensus is reached on the payload void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) { - // We are faking the sender - persistData(self(), transactionId, payload, batchHint); + boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); + if (canSkipPayload) { + applyState(self(), transactionId, payload); + } else { + // We are faking the sender + persistData(self(), transactionId, payload, batchHint); + } } private void handleCommitTransaction(final CommitTransaction commit) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 78b49a60ae..282191216d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -69,7 +69,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; @@ -597,7 +596,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } private void processNextPending() { - processNextPendingFinishCommit(); processNextPendingCommit(); processNextPendingTransaction(); } @@ -628,11 +626,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { entry -> startCommit(entry.cohort, entry.cohort.getCandidate())); } - private void processNextPendingFinishCommit() { - processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING, - entry -> payloadReplicationComplete(entry.cohort.getIdentifier())); - } - private boolean peekNextPendingCommit() { final CommitEntry first = pendingCommits.peek(); return first != null && first.cohort.getState() == State.COMMIT_PENDING; @@ -739,15 +732,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier()); final TransactionIdentifier txId = cohort.getIdentifier(); - if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) { - LOG.debug("{}: No replication required, proceeding to finish commit", logContext); - pendingCommits.remove(); - pendingFinishCommits.add(entry); - cohort.finishCommitPending(); - payloadReplicationComplete(txId); - return; - } - final Payload payload; try { payload = CommitTransactionPayload.create(txId, candidate); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index deec9e5adc..6cb9badd8d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -25,7 +25,6 @@ public abstract class ShardDataTreeCohort implements Identifiable { + shardDataTree.applyReplicatedPayload(invocation.getArgumentAt(0, TransactionIdentifier.class), + invocation.getArgumentAt(1, Payload.class)); + return null; + }).when(mockShard).persistPayload(any(TransactionIdentifier.class), any(CommitTransactionPayload.class), + anyBoolean()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java index 9c7442bbbc..42d0337ba9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java @@ -29,6 +29,7 @@ import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit; import com.google.common.base.Optional; @@ -73,7 +74,6 @@ public class ShardDataTreeTest extends AbstractTest { @Before public void setUp() { - doReturn(true).when(mockShard).canSkipPayload(); doReturn(Ticker.systemTicker()).when(mockShard).ticker(); doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean(); @@ -94,6 +94,7 @@ public class ShardDataTreeTest extends AbstractTest { private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) throws ExecutionException, InterruptedException { + immediatePayloadReplication(shardDataTree, mockShard); assertEquals(fullSchema, shardDataTree.getSchemaContext()); @@ -134,6 +135,8 @@ public class ShardDataTreeTest extends AbstractTest { @Test public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException { + immediatePayloadReplication(shardDataTree, mockShard); + final List candidates = new ArrayList<>(); candidates.add(addCar(shardDataTree)); candidates.add(removeCar(shardDataTree)); @@ -149,6 +152,8 @@ public class ShardDataTreeTest extends AbstractTest { @Test public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException { + immediatePayloadReplication(shardDataTree, mockShard); + final List candidates = new ArrayList<>(); candidates.add(addCar(shardDataTree)); candidates.add(removeCar(shardDataTree)); @@ -166,6 +171,8 @@ public class ShardDataTreeTest extends AbstractTest { @Test public void testListenerNotifiedOnApplySnapshot() throws Exception { + immediatePayloadReplication(shardDataTree, mockShard); + DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class); shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener); @@ -184,6 +191,7 @@ public class ShardDataTreeTest extends AbstractTest { }); ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL); + immediatePayloadReplication(newDataTree, mockShard); addCar(newDataTree, "optima"); addCar(newDataTree, "murano"); @@ -206,8 +214,6 @@ public class ShardDataTreeTest extends AbstractTest { @Test public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception { - doReturn(false).when(mockShard).canSkipPayload(); - final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot -> snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer())); @@ -303,8 +309,6 @@ public class ShardDataTreeTest extends AbstractTest { @Test public void testPipelinedTransactionsWithImmediateCommits() throws Exception { - doReturn(false).when(mockShard).canSkipPayload(); - final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot -> snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer())); @@ -347,31 +351,27 @@ public class ShardDataTreeTest extends AbstractTest { } @Test - public void testPipelinedTransactionsWithUnmodifiedCandidate() throws Exception { - doReturn(false).when(mockShard).canSkipPayload(); + public void testPipelinedTransactionsWithImmediateReplication() throws Exception { + immediatePayloadReplication(shardDataTree, mockShard); final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot -> snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer())); final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot -> - snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer())); - - final FutureCallback commitCallback1 = immediate3PhaseCommit(cohort1); + snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode())); - verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), eq(false)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100")); + final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode)); + final FutureCallback commitCallback1 = immediate3PhaseCommit(cohort1); final FutureCallback commitCallback2 = immediate3PhaseCommit(cohort2); + final FutureCallback commitCallback3 = immediate3PhaseCommit(cohort3); - verify(mockShard, never()).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class), - anyBoolean()); - - // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload. - shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), - CommitTransactionPayload.create(nextTransactionId(), cohort1.getCandidate())); - - InOrder inOrder = inOrder(commitCallback1, commitCallback2); + InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3); inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class)); inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class)); + inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class)); final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot(); @@ -382,8 +382,6 @@ public class ShardDataTreeTest extends AbstractTest { @SuppressWarnings("unchecked") @Test public void testAbortWithPendingCommits() throws Exception { - doReturn(false).when(mockShard).canSkipPayload(); - final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot -> snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer())); @@ -441,6 +439,8 @@ public class ShardDataTreeTest extends AbstractTest { @SuppressWarnings("unchecked") @Test public void testAbortWithFailedRebase() throws Exception { + immediatePayloadReplication(shardDataTree, mockShard); + final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot -> snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer())); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index b1e31380a4..ddd2cb0327 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -63,7 +63,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; @@ -76,7 +75,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeCh import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; @@ -1154,67 +1152,6 @@ public class ShardTest extends AbstractShardTest { }; } - @Test - public void testReadWriteCommitWhenTransactionHasNoModifications() { - testCommitWhenTransactionHasNoModifications(true); - } - - @Test - public void testWriteOnlyCommitWhenTransactionHasNoModifications() { - testCommitWhenTransactionHasNoModifications(false); - } - - private void testCommitWhenTransactionHasNoModifications(final boolean readWrite) { - // Note that persistence is enabled which would normally result in the - // entry getting written to the journal - // but here that need not happen - new ShardTestKit(getSystem()) { - { - final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWhenTransactionHasNoModifications-" + readWrite); - - waitUntilLeader(shard); - - final TransactionIdentifier transactionID = nextTransactionId(); - - final FiniteDuration duration = duration("5 seconds"); - - if (readWrite) { - final ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore() - .newReadWriteTransaction(transactionID); - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef()); - } else { - shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()), - getRef()); - } - - expectMsgClass(duration, ReadyTransactionReply.class); - - // Send the CanCommitTransaction message. - - shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); - final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply - .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class)); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); - - shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.class); - - shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); - final ShardStats shardStats = expectMsgClass(duration, ShardStats.class); - - // Use MBean for verification - // Committed transaction count should increase as usual - assertEquals(1, shardStats.getCommittedTransactionsCount()); - - // Commit index should not advance because this does not go into - // the journal - assertEquals(-1, shardStats.getCommitIndex()); - } - }; - } - @Test public void testReadWriteCommitWhenTransactionHasModifications() throws Exception { testCommitWhenTransactionHasModifications(true);