+ @Test(expected = TestException.class)
+ public void testOnReceiveBatchedModificationsFailure() throws Exception {
+ ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
+ DataTreeModification mockModification = mock(DataTreeModification.class);
+ ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+ nextTransactionId(), mockModification);
+ final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
+ "testOnReceiveBatchedModificationsFailure");
+
+ TestKit watcher = new TestKit(getSystem());
+ watcher.watch(transaction);
+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doThrow(new TestException()).when(mockModification).write(path, node);
+
+ final TransactionIdentifier tx1 = nextTransactionId();
+ BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(path, node));
+
+ transaction.tell(batched, testKit.getRef());
+ testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
+
+ batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+ batched.setReady();
+ batched.setTotalMessagesSent(2);
+
+ transaction.tell(batched, testKit.getRef());
+ Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
+ watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
+
+ if (failure != null) {
+ Throwables.propagateIfPossible(failure.cause(), Exception.class);
+ throw new RuntimeException(failure.cause());
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+ "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+ TestKit watcher = new TestKit(getSystem());
+ watcher.watch(transaction);
+
+ BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+ DataStoreVersions.CURRENT_VERSION);
+ batched.setReady();
+ batched.setTotalMessagesSent(2);
+
+ transaction.tell(batched, testKit.getRef());
+
+ Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
+ watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
+
+ if (failure != null) {
+ Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
+ Throwables.throwIfUnchecked(failure.cause());
+ throw new RuntimeException(failure.cause());
+ }
+ }