X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=ac2c07964179192f86b3123f5a9f8a24a8cd2e9d;hp=8278d3cffceaa6b82357c08c4ab62e33ff53fafe;hb=00cc355c0c58e999ffebd531bca3a507e150e441;hpb=12d62e4939a27a3deba065bce79274c9eaf69964 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 8278d3cffc..ac2c079641 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -9,6 +9,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -31,6 +32,7 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -384,24 +386,18 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWrite() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); } @Test @@ -456,7 +452,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { // This sends the batched modification. transactionProxy.ready(); - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), BatchedModificationsReply.class); @@ -479,48 +475,36 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMerge() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false); } @Test public void testDelete() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false); } @Test - public void testReady() throws Exception { + public void testReadyWithReadWrite() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -550,18 +534,91 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + + verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + } + + @Test + public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + expectBatchedModificationsReady(actorRef, 1); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures()); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), true, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + } + + @Test + public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + expectBatchedModificationsReady(actorRef, 1); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), false, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + verifyBatchedModifications(batchedModifications.get(1), true); + + verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); } @Test public void testReadyWithRecordingOperationFailure() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectFailedBatchedModifications(actorRef); - expectReadyTransaction(actorRef); - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -581,15 +638,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyWithReplyFailure() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModifications(actorRef, 1); - - doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + expectFailedBatchedModifications(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -601,9 +656,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - verifyCohortFutures(proxy, TestException.class); } @@ -634,15 +686,16 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyWithInvalidReplyMessageType() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModifications(actorRef, 1); + //expectBatchedModifications(actorRef, 1); doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + isA(BatchedModifications.class)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -657,17 +710,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyCohortFutures(proxy, IllegalArgumentException.class); } - @Test - public void testUnusedTransaction() throws Exception { - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - - assertEquals("canCommit", true, ready.canCommit().get()); - ready.preCommit().get(); - ready.commit().get(); - } - @Test public void testGetIdentifier() { setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); @@ -711,24 +753,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { */ @Test public void testLocalTxActorRead() throws Exception { - ActorSystem actorSystem = getSystem(); - ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - - doReturn(actorSystem.actorSelection(shardActorRef.path())). - when(mockActorContext).actorSelection(shardActorRef.path().toString()); - - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() - .setTransactionId("txn-1").setTransactionActorPath(actorPath).build(); - - doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, READ_ONLY)); - - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY); @@ -764,40 +790,20 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testLocalTxActorReady() throws Exception { - ActorSystem actorSystem = getSystem(); - ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - - doReturn(actorSystem.actorSelection(shardActorRef.path())). - when(mockActorContext).actorSelection(shardActorRef.path().toString()); - - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). - setTransactionId("txn-1").setTransactionActorPath(actorPath). - setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); - - doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, WRITE_ONLY)); - - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), isA(BatchedModifications.class)); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - // testing ready - doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(ReadyTransaction.class)); + doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.class)); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); @@ -805,7 +811,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path())); + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); } private static interface TransactionProxyOperation { @@ -875,20 +881,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } else { - doReturn(Futures.failed(new Exception("not found"))) + doReturn(Futures.failed(new PrimaryNotFoundException("test"))) .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + String actorPath = txActorRef.path().toString(); CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). setTransactionId("txn-1").setTransactionActorPath(actorPath). setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); + doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath); + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE)); - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -921,6 +930,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardFound(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -938,6 +948,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardNotFound(){ // Confirm that there is no throttling when the Shard is not found + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -956,6 +967,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -972,7 +984,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardFound(){ - + dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -989,7 +1001,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardNotFound(){ - + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1006,6 +1018,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1054,6 +1067,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testDeleteCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1210,13 +1224,11 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { }, 2, true); } - @Test - public void testModificationOperationBatching() throws Throwable { + private void testModificationOperationBatching(TransactionType type) throws Exception { int shardBatchedModificationCount = 3; - doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()). - when(mockActorContext).getDatastoreContext(); + dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount); - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type); expectBatchedModifications(actorRef, shardBatchedModificationCount); @@ -1243,7 +1255,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH; YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH; - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type); transactionProxy.write(writePath1, writeNode1); transactionProxy.write(writePath2, writeNode2); @@ -1260,24 +1272,46 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 3, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1), + verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1), new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1)); - verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1), + verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); - verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3), + boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled(); + verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + if(optimizedWriteOnly) { + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class, BatchedModificationsReply.class); + } else { + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + } + } + + @Test + public void testReadWriteModificationOperationBatching() throws Throwable { + testModificationOperationBatching(READ_WRITE); + } + + @Test + public void testWriteOnlyModificationOperationBatching() throws Throwable { + testModificationOperationBatching(WRITE_ONLY); + } + + @Test + public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testModificationOperationBatching(WRITE_ONLY); } @Test public void testModificationOperationBatchingWithInterleavedReads() throws Throwable { + int shardBatchedModificationCount = 10; - doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()). - when(mockActorContext).getDatastoreContext(); + dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); @@ -1333,13 +1367,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 3, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1), + verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1), new WriteModification(writePath2, writeNode2)); - verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1), + verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2)); - verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath)); + verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath)); InOrder inOrder = Mockito.inOrder(mockActorContext); inOrder.verify(mockActorContext).executeOperationAsync(