From: Tom Pantelis Date: Fri, 16 Dec 2016 21:05:15 +0000 (-0500) Subject: BUG-7033: Implement batchHint in ShardDataTree X-Git-Tag: release/carbon~342 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=29aa3b83db308b1a511857db8d918e0f2e629407;ds=sidebyside BUG-7033: Implement batchHint in ShardDataTree Modified ShardDataTree to properly compute the batchHint flag that is passed to persistData. Basically if the next transaction in the pendingCommits queue is in the COMMIT_PENDING state then batchHint is set to true. After the call to persistData the next transaction in the pendingCommits queue is processed which, if COMMIT_PENDING, will be batched for replication with the previous transaction. Change-Id: I938e98391c69d617901e7e179c93e066667c017d Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 23a5a0e20b..90c47256b4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -457,9 +457,9 @@ public class Shard extends RaftActor { } // applyState() will be invoked once consensus is reached on the payload - void persistPayload(final TransactionIdentifier transactionId, final Payload payload) { + void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) { // We are faking the sender - persistData(self(), transactionId, payload, false); + persistData(self(), transactionId, payload, batchHint); } private void handleCommitTransaction(final CommitTransaction commit) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 064f6f5d8a..9d65d993bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -630,6 +630,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { maybeRunOperationOnPendingTransactionsComplete(); } + private boolean peekNextPendingCommit() { + final CommitEntry first = pendingCommits.peek(); + return first != null && first.cohort.getState() == State.COMMIT_PENDING; + } + private void processNextPending() { processNextPendingCommit(); processNextPendingTransaction(); @@ -658,17 +663,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort current = entry.cohort; Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); + + LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier()); + final DataTreeCandidateTip candidate; try { candidate = tip.prepare(cohort.getDataTreeModification()); - } catch (Exception e) { - failPreCommit(e); - return; - } - - try { cohort.userPreCommit(candidate); - } catch (ExecutionException | TimeoutException e) { + } catch (ExecutionException | TimeoutException | RuntimeException e) { failPreCommit(e); return; } @@ -680,6 +682,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { pendingTransactions.remove(); pendingCommits.add(entry); + + LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); + cohort.successfulPreCommit(candidate); processNextPendingTransaction(); @@ -733,6 +738,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier()); + if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) { LOG.debug("{}: No replication required, proceeding to finish commit", logContext); pendingCommits.remove(); @@ -745,22 +752,41 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final Payload payload; try { payload = CommitTransactionPayload.create(txId, candidate); - - // Once completed, we will continue via payloadReplicationComplete - entry.lastAccess = shard.ticker().read(); - shard.persistPayload(txId, payload); - - LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); - - pendingCommits.remove(); - pendingFinishCommits.add(entry); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); pendingCommits.poll().cohort.failedCommit(e); + processNextPending(); return; } - processNextPending(); + // We process next transactions pending canCommit before we call persistPayload to possibly progress subsequent + // transactions to the COMMIT_PENDING state so the payloads can be batched for replication. This is done for + // single-shard transactions that immediately transition from canCommit to preCommit to commit. Note that + // if the next pending transaction is progressed to COMMIT_PENDING and this method (startCommit) is called, + // the next transaction will not attempt to replicate b/c the current transaction is still at the head of the + // pendingCommits queue. + processNextPendingTransaction(); + + // After processing next pending transactions, we can now remove the current transaction from pendingCommits. + // Note this must be done before the call to peekNextPendingCommit below so we check the next transaction + // in order to properly determine the batchHint flag for the call to persistPayload. + pendingCommits.remove(); + pendingFinishCommits.add(entry); + + // See if the next transaction is pending commit (ie in the COMMIT_PENDING state) so it can be batched with + // this transaction for replication. + boolean replicationBatchHint = peekNextPendingCommit(); + + // Once completed, we will continue via payloadReplicationComplete + shard.persistPayload(txId, payload, replicationBatchHint); + + entry.lastAccess = shard.ticker().read(); + + LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); + + // Process the next transaction pending commit, if any. If there is one it will be batched with this + // transaction for replication. + processNextPendingCommit(); } void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java index d273910b6d..6696df0781 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java @@ -87,6 +87,29 @@ public final class ShardDataTreeMocking { return callback; } + public static FutureCallback immediate3PhaseCommit(final ShardDataTreeCohort cohort) { + final FutureCallback commitCallback = mockCallback(); + doNothing().when(commitCallback).onSuccess(any(UnsignedLong.class)); + doNothing().when(commitCallback).onFailure(any(Throwable.class)); + + final FutureCallback preCommitCallback = mockCallback(); + doAnswer(invocation -> { + cohort.commit(commitCallback); + return null; + }).when(preCommitCallback).onSuccess(any(DataTreeCandidate.class)); + doNothing().when(preCommitCallback).onFailure(any(Throwable.class)); + + final FutureCallback canCommit = mockCallback(); + doAnswer(invocation -> { + cohort.preCommit(preCommitCallback); + return null; + }).when(canCommit).onSuccess(null); + doNothing().when(canCommit).onFailure(any(Throwable.class)); + + cohort.canCommit(canCommit); + return commitCallback; + } + @SuppressWarnings("unchecked") private static Object invokeSuccess(final InvocationOnMock invocation, final T value) { invocation.getArgumentAt(0, FutureCallback.class).onSuccess(value); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java index 6b22426c76..6f0b32b450 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java @@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doNothing; @@ -25,6 +26,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit; @@ -203,7 +205,7 @@ public class ShardDataTreeTest extends AbstractTest { } @Test - public void testPipelinedTransactions() throws Exception { + public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception { doReturn(false).when(mockShard).canSkipPayload(); final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot -> @@ -241,26 +243,30 @@ public class ShardDataTreeTest extends AbstractTest { verify(preCommitCallback4).onSuccess(cohort4.getCandidate()); final FutureCallback commitCallback2 = coordinatedCommit(cohort2); - verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class)); + verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), + anyBoolean()); verifyNoMoreInteractions(commitCallback2); final FutureCallback commitCallback4 = coordinatedCommit(cohort4); - verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class)); + verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class), + anyBoolean()); verifyNoMoreInteractions(commitCallback4); final FutureCallback commitCallback1 = coordinatedCommit(cohort1); InOrder inOrder = inOrder(mockShard); - inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class)); - inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class)); - inOrder.verifyNoMoreInteractions(); + inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), + eq(true)); + inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class), + eq(false)); verifyNoMoreInteractions(commitCallback1); verifyNoMoreInteractions(commitCallback2); final FutureCallback commitCallback3 = coordinatedCommit(cohort3); inOrder = inOrder(mockShard); - inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class)); - inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class)); - inOrder.verifyNoMoreInteractions(); + inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class), + eq(true)); + inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class), + eq(false)); verifyNoMoreInteractions(commitCallback3); verifyNoMoreInteractions(commitCallback4); @@ -295,6 +301,51 @@ public class ShardDataTreeTest extends AbstractTest { assertEquals("People node", peopleNode, optional.get()); } + @Test + public void testPipelinedTransactionsWithImmediateCommits() throws Exception { + doReturn(false).when(mockShard).canSkipPayload(); + + final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot -> + snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer())); + + final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot -> + snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode())); + + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100")); + final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode)); + + final FutureCallback commitCallback2 = immediate3PhaseCommit(cohort2); + final FutureCallback commitCallback3 = immediate3PhaseCommit(cohort3); + final FutureCallback commitCallback1 = immediate3PhaseCommit(cohort1); + + InOrder inOrder = inOrder(mockShard); + inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), + eq(true)); + inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class), + eq(true)); + inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class), + eq(false)); + + // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload. + CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(), + cohort1.getCandidate()); + shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload); + + inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3); + inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class)); + inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class)); + inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class)); + + final DataTreeSnapshot snapshot = + shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot(); + Optional> optional = snapshot.readNode(carPath); + assertEquals("Car node present", true, optional.isPresent()); + assertEquals("Car node", carNode, optional.get()); + } + @SuppressWarnings("unchecked") @Test public void testAbortWithPendingCommits() throws Exception { @@ -333,10 +384,12 @@ public class ShardDataTreeTest extends AbstractTest { coordinatedCommit(cohort4); InOrder inOrder = inOrder(mockShard); - inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class)); - inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class)); - inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class)); - inOrder.verifyNoMoreInteractions(); + inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), + eq(false)); + inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class), + eq(false)); + inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class), + eq(false)); // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload. CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 562a23765a..b1e31380a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1520,14 +1520,15 @@ public class ShardTest extends AbstractShardTest { { final Creator creator = () -> new Shard(newShardBuilder()) { @Override - void persistPayload(final TransactionIdentifier transactionId, final Payload payload) { + void persistPayload(final TransactionIdentifier transactionId, final Payload payload, + boolean batchHint) { // Simulate an AbortTransaction message occurring during // replication, after // persisting and before finishing the commit to the // in-memory store. doAbortTransaction(transactionId, null); - super.persistPayload(transactionId, payload); + super.persistPayload(transactionId, payload, batchHint); } };