+ private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
+ return store.newReadOnlyTransaction(nextTransactionId());
+ }
+
+ private ReadWriteShardDataTreeTransaction readWriteTransaction() {
+ return store.newReadWriteTransaction(nextTransactionId());
+ }
+
+ @Test
+ public void testOnReceiveReadData() {
+ testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
+ testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
+ }
+
+ private void testOnReceiveReadData(final ActorRef transaction) {
+ transaction.tell(new ReadData(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
+ testKit.getRef());
+
+ ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
+
+ assertNotNull(reply.getNormalizedNode());
+ }
+
+ @Test
+ public void testOnReceiveReadDataWhenDataNotFound() {
+ testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
+ "testReadDataWhenDataNotFoundRO"));
+ testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
+ "testReadDataWhenDataNotFoundRW"));
+ }
+
+ private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
+ transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
+
+ ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
+
+ assertNull(reply.getNormalizedNode());
+ }
+
+ @Test
+ public void testOnReceiveDataExistsPositive() {
+ testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
+ testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
+ }
+
+ private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
+ transaction.tell(new DataExists(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
+ testKit.getRef());
+
+ DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
+
+ assertTrue(reply.exists());
+ }
+
+ @Test
+ public void testOnReceiveDataExistsNegative() {
+ testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
+ testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
+ }
+
+ private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
+ transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
+
+ DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
+
+ assertFalse(reply.exists());
+ }
+
+ @Test
+ public void testOnReceiveBatchedModifications() {
+ ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
+ DataTreeModification mockModification = mock(DataTreeModification.class);
+ ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+ nextTransactionId(), mockModification);
+ final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
+
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
+ .build();
+
+ YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+ BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+ DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(writePath, writeData));
+ batched.addModification(new MergeModification(mergePath, mergeData));
+ batched.addModification(new DeleteModification(deletePath));
+
+ transaction.tell(batched, testKit.getRef());
+
+ BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
+ BatchedModificationsReply.class);
+ assertEquals("getNumBatched", 3, reply.getNumBatched());
+
+ InOrder inOrder = inOrder(mockModification);
+ inOrder.verify(mockModification).write(writePath, writeData);
+ inOrder.verify(mockModification).merge(mergePath, mergeData);
+ inOrder.verify(mockModification).delete(deletePath);
+ }
+
+ @Test
+ public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+ "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
+
+ TestKit watcher = new TestKit(getSystem());
+ watcher.watch(transaction);
+
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ final TransactionIdentifier tx1 = nextTransactionId();
+ BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(writePath, writeData));
+
+ transaction.tell(batched, testKit.getRef());
+ BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
+ BatchedModificationsReply.class);
+ assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+ batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+ batched.setReady();
+ batched.setTotalMessagesSent(2);
+
+ transaction.tell(batched, testKit.getRef());
+ testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
+ watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
+ }
+
+ @Test
+ public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+ "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
+
+ TestKit watcher = new TestKit(getSystem());
+ watcher.watch(transaction);
+
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+ DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(writePath, writeData));
+ batched.setReady();
+ batched.setDoCommitOnReady(true);
+ batched.setTotalMessagesSent(1);
+
+ transaction.tell(batched, testKit.getRef());
+ testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
+ watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
+ }
+
+ @Test(expected = TestException.class)
+ public void testOnReceiveBatchedModificationsFailure() throws Exception {
+ ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
+ DataTreeModification mockModification = mock(DataTreeModification.class);
+ ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+ nextTransactionId(), mockModification);
+ final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
+ "testOnReceiveBatchedModificationsFailure");
+
+ TestKit watcher = new TestKit(getSystem());
+ watcher.watch(transaction);
+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doThrow(new TestException()).when(mockModification).write(path, node);
+
+ final TransactionIdentifier tx1 = nextTransactionId();
+ BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(path, node));
+
+ transaction.tell(batched, testKit.getRef());
+ testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
+
+ batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+ batched.setReady();
+ batched.setTotalMessagesSent(2);
+
+ transaction.tell(batched, testKit.getRef());
+ Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
+ watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
+
+ if (failure != null) {
+ Throwables.propagateIfPossible(failure.cause(), Exception.class);
+ throw new RuntimeException(failure.cause());