Align tested boolean/Boolean expectations
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.anyString;
17 import static org.mockito.Matchers.eq;
18 import static org.mockito.Matchers.isA;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.doThrow;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
26
27 import akka.actor.ActorRef;
28 import akka.actor.ActorSelection;
29 import akka.actor.ActorSystem;
30 import akka.actor.Props;
31 import akka.dispatch.Futures;
32 import akka.util.Timeout;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.ImmutableSortedSet;
35 import com.google.common.collect.Sets;
36 import com.google.common.util.concurrent.FluentFuture;
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.MoreExecutors;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.util.Collection;
41 import java.util.List;
42 import java.util.Optional;
43 import java.util.SortedSet;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Assert;
49 import org.junit.Test;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.InOrder;
52 import org.mockito.Mockito;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.datastore.config.Configuration;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
58 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
60 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
65 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
66 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
69 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
70 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
71 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
72 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
73 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
74 import org.opendaylight.mdsal.common.api.ReadFailedException;
75 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
77 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
81 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
82 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
83 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
84 import scala.concurrent.Promise;
85
86 @SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
87 public class TransactionProxyTest extends AbstractTransactionProxyTest {
88
89     @SuppressWarnings("serial")
90     static class TestException extends RuntimeException {
91     }
92
93     interface Invoker {
94         FluentFuture<?> invoke(TransactionProxy proxy);
95     }
96
97     @Test
98     public void testRead() throws Exception {
99         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
100
101         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
102
103         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
104                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
105
106         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
107                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108
109         assertFalse("NormalizedNode isPresent", readOptional.isPresent());
110
111         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
112
113         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
114                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
115
116         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
117
118         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
119
120         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
121     }
122
123     @Test(expected = ReadFailedException.class)
124     public void testReadWithInvalidReplyMessageType() throws Throwable {
125         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
126
127         doReturn(Futures.successful(new Object())).when(mockActorContext)
128                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
129
130         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
131
132         try {
133             transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
134         } catch (ExecutionException e) {
135             throw e.getCause();
136         }
137     }
138
139     @Test(expected = TestException.class)
140     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
141         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
142
143         doReturn(Futures.failed(new TestException())).when(mockActorContext)
144                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
145
146         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
147
148         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
149     }
150
151     private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
152             throws Throwable {
153         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
154
155         if (exToThrow instanceof PrimaryNotFoundException) {
156             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
157         } else {
158             doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
159                     .findPrimaryShardAsync(anyString());
160         }
161
162         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
163                 any(ActorSelection.class), any(), any(Timeout.class));
164
165         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
166
167         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
168     }
169
170     private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
171         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
172     }
173
174     @Test(expected = PrimaryNotFoundException.class)
175     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
176         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
177     }
178
179     @Test(expected = TestException.class)
180     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
181         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
182                 new TestException()));
183     }
184
185     @Test(expected = TestException.class)
186     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
187         testReadWithExceptionOnInitialCreateTransaction(new TestException());
188     }
189
190     @Test
191     public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
192         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
193
194         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
195
196         expectBatchedModifications(actorRef, 1);
197
198         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
199                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
200
201         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
202
203         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
204
205         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
206                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
207
208         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
209         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
210
211         InOrder inOrder = Mockito.inOrder(mockActorContext);
212         inOrder.verify(mockActorContext).executeOperationAsync(
213                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
214
215         inOrder.verify(mockActorContext).executeOperationAsync(
216                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
217     }
218
219     @Test(expected = IllegalStateException.class)
220     public void testReadPreConditionCheck() {
221         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
222         transactionProxy.read(TestModel.TEST_PATH);
223     }
224
225     @Test(expected = IllegalArgumentException.class)
226     public void testInvalidCreateTransactionReply() throws Throwable {
227         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
228
229         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
230                 .actorSelection(actorRef.path().toString());
231
232         doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
233                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
234
235         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
236             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
237             any(Timeout.class));
238
239         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
240
241         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
242     }
243
244     @Test
245     public void testExists() throws Exception {
246         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
247
248         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
249
250         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
251                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
252
253         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
254
255         assertEquals("Exists response", Boolean.FALSE, exists);
256
257         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
258                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
259
260         exists = transactionProxy.exists(TestModel.TEST_PATH).get();
261
262         assertEquals("Exists response", Boolean.TRUE, exists);
263     }
264
265     @Test(expected = PrimaryNotFoundException.class)
266     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
267         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
268             proxy -> proxy.exists(TestModel.TEST_PATH));
269     }
270
271     @Test(expected = ReadFailedException.class)
272     public void testExistsWithInvalidReplyMessageType() throws Throwable {
273         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
274
275         doReturn(Futures.successful(new Object())).when(mockActorContext)
276                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
277
278         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
279
280         try {
281             transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
282         } catch (ExecutionException e) {
283             throw e.getCause();
284         }
285     }
286
287     @Test(expected = TestException.class)
288     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
289         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
290
291         doReturn(Futures.failed(new TestException())).when(mockActorContext)
292                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
293
294         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
295
296         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
297     }
298
299     @Test
300     public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
301         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
302
303         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
304
305         expectBatchedModifications(actorRef, 1);
306
307         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
308                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
309
310         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
311
312         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
313
314         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
315
316         assertEquals("Exists response", Boolean.TRUE, exists);
317
318         InOrder inOrder = Mockito.inOrder(mockActorContext);
319         inOrder.verify(mockActorContext).executeOperationAsync(
320                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
321
322         inOrder.verify(mockActorContext).executeOperationAsync(
323                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
324     }
325
326     @Test(expected = IllegalStateException.class)
327     public void testExistsPreConditionCheck() {
328         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
329         transactionProxy.exists(TestModel.TEST_PATH);
330     }
331
332     @Test
333     public void testWrite() {
334         dataStoreContextBuilder.shardBatchedModificationCount(1);
335         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
336
337         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
338
339         expectBatchedModifications(actorRef, 1);
340
341         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
342
343         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
344
345         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
346     }
347
348     @Test
349     @SuppressWarnings("checkstyle:IllegalCatch")
350     public void testWriteAfterAsyncRead() throws Exception {
351         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
352                 DefaultShardStrategy.DEFAULT_SHARD);
353
354         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
355         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
356                 eq(getSystem().actorSelection(actorRef.path())),
357                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
358
359         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
360                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
361
362         expectBatchedModificationsReady(actorRef);
363
364         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
365
366         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
367
368         final CountDownLatch readComplete = new CountDownLatch(1);
369         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
370         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
371                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
372                     @Override
373                     public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
374                         try {
375                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
376                         } catch (Exception e) {
377                             caughtEx.set(e);
378                         } finally {
379                             readComplete.countDown();
380                         }
381                     }
382
383                     @Override
384                     public void onFailure(final Throwable failure) {
385                         caughtEx.set(failure);
386                         readComplete.countDown();
387                     }
388                 }, MoreExecutors.directExecutor());
389
390         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
391
392         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
393
394         final Throwable t = caughtEx.get();
395         if (t != null) {
396             Throwables.propagateIfPossible(t, Exception.class);
397             throw new RuntimeException(t);
398         }
399
400         // This sends the batched modification.
401         transactionProxy.ready();
402
403         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
404     }
405
406     @Test(expected = IllegalStateException.class)
407     public void testWritePreConditionCheck() {
408         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
409         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
410     }
411
412     @Test(expected = IllegalStateException.class)
413     public void testWriteAfterReadyPreConditionCheck() {
414         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
415
416         transactionProxy.ready();
417
418         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
419     }
420
421     @Test
422     public void testMerge() {
423         dataStoreContextBuilder.shardBatchedModificationCount(1);
424         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
425
426         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
427
428         expectBatchedModifications(actorRef, 1);
429
430         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
431
432         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
433
434         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
435     }
436
437     @Test
438     public void testDelete() {
439         dataStoreContextBuilder.shardBatchedModificationCount(1);
440         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
441
442         expectBatchedModifications(actorRef, 1);
443
444         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
445
446         transactionProxy.delete(TestModel.TEST_PATH);
447
448         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
449     }
450
451     @Test
452     public void testReadWrite() {
453         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
454
455         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
456
457         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
458                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
459
460         expectBatchedModifications(actorRef, 1);
461
462         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
463
464         transactionProxy.read(TestModel.TEST_PATH);
465
466         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
467
468         transactionProxy.read(TestModel.TEST_PATH);
469
470         transactionProxy.read(TestModel.TEST_PATH);
471
472         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
473         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
474
475         verifyBatchedModifications(batchedModifications.get(0), false,
476                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
477     }
478
479     @Test
480     public void testReadyWithReadWrite() {
481         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
482
483         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
484
485         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
486                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
487
488         expectBatchedModificationsReady(actorRef, true);
489
490         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
491
492         transactionProxy.read(TestModel.TEST_PATH);
493
494         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
495
496         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
497
498         assertTrue(ready instanceof SingleCommitCohortProxy);
499
500         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
501
502         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
503         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
504
505         verifyBatchedModifications(batchedModifications.get(0), true, true,
506                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
507
508         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
509     }
510
511     @Test
512     public void testReadyWithNoModifications() {
513         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
514
515         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
516                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
517
518         expectBatchedModificationsReady(actorRef, true);
519
520         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
521
522         transactionProxy.read(TestModel.TEST_PATH);
523
524         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
525
526         assertTrue(ready instanceof SingleCommitCohortProxy);
527
528         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
529
530         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
531         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
532
533         verifyBatchedModifications(batchedModifications.get(0), true, true);
534     }
535
536     @Test
537     public void testReadyWithMultipleShardWrites() {
538         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
539
540         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
541                 TestModel.JUNK_QNAME.getLocalName());
542
543         expectBatchedModificationsReady(actorRef1);
544         expectBatchedModificationsReady(actorRef2);
545
546         ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
547
548         doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
549                 .actorSelection(actorRef3.path().toString());
550
551         doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
552                 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
553
554         expectReadyLocalTransaction(actorRef3, false);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
559         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
560         transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
561
562         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
563
564         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
565
566         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
567                 actorSelection(actorRef2), actorSelection(actorRef3));
568
569         SortedSet<String> expShardNames =
570                 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
571                         TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
572
573         ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
574         verify(mockActorContext).executeOperationAsync(
575                 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
576         assertTrue("Participating shards present", batchedMods.getValue().getParticipatingShardNames().isPresent());
577         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
578
579         batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
580         verify(mockActorContext).executeOperationAsync(
581                 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
582         assertTrue("Participating shards present", batchedMods.getValue().getParticipatingShardNames().isPresent());
583         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
584
585         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
586         verify(mockActorContext).executeOperationAsync(
587                 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
588         assertTrue("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
589         assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
590     }
591
592     @Test
593     public void testReadyWithWriteOnlyAndLastBatchPending() {
594         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
595
596         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
597
598         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
599
600         expectBatchedModificationsReady(actorRef, true);
601
602         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
603
604         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
605
606         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
607
608         assertTrue(ready instanceof SingleCommitCohortProxy);
609
610         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
611
612         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
613         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
614
615         verifyBatchedModifications(batchedModifications.get(0), true, true,
616                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
617     }
618
619     @Test
620     public void testReadyWithWriteOnlyAndLastBatchEmpty() {
621         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
622         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
623
624         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
625
626         expectBatchedModificationsReady(actorRef, true);
627
628         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
629
630         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
631
632         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
633
634         assertTrue(ready instanceof SingleCommitCohortProxy);
635
636         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
637
638         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
639         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
640
641         verifyBatchedModifications(batchedModifications.get(0), false,
642                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
643
644         verifyBatchedModifications(batchedModifications.get(1), true, true);
645     }
646
647     @Test
648     public void testReadyWithReplyFailure() {
649         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
650
651         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
652
653         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
654
655         expectFailedBatchedModifications(actorRef);
656
657         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
658
659         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
660
661         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
662
663         assertTrue(ready instanceof SingleCommitCohortProxy);
664
665         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
666     }
667
668     @Test
669     public void testReadyWithDebugContextEnabled() {
670         dataStoreContextBuilder.transactionDebugContextEnabled(true);
671
672         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
673
674         expectBatchedModificationsReady(actorRef, true);
675
676         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
677
678         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
679
680         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
681
682         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
683
684         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
685     }
686
687     @Test
688     public void testReadyWithLocalTransaction() {
689         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
690
691         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
692                 .actorSelection(shardActorRef.path().toString());
693
694         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
695                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
696
697         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
698
699         expectReadyLocalTransaction(shardActorRef, true);
700
701         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
702         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
703
704         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
705         assertTrue(ready instanceof SingleCommitCohortProxy);
706         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
707
708         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
709         verify(mockActorContext).executeOperationAsync(
710                 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
711         assertFalse("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
712     }
713
714     @Test
715     public void testReadyWithLocalTransactionWithFailure() {
716         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
717
718         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
719                 .actorSelection(shardActorRef.path().toString());
720
721         DataTree mockDataTree = createDataTree();
722         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
723         doThrow(new RuntimeException("mock")).when(mockModification).ready();
724
725         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
726                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
727
728         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
729
730         expectReadyLocalTransaction(shardActorRef, true);
731
732         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
733         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
734
735         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
736         assertTrue(ready instanceof SingleCommitCohortProxy);
737         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
738     }
739
740     private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
741         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
742
743         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
744
745         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
746
747         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
748
749         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
750
751         transactionProxy.delete(TestModel.TEST_PATH);
752
753         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
754
755         assertTrue(ready instanceof SingleCommitCohortProxy);
756
757         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
758     }
759
760     @Test
761     public void testWriteOnlyTxWithPrimaryNotFoundException() {
762         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
763     }
764
765     @Test
766     public void testWriteOnlyTxWithNotInitializedException() {
767         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
768     }
769
770     @Test
771     public void testWriteOnlyTxWithNoShardLeaderException() {
772         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
773     }
774
775     @Test
776     public void testReadyWithInvalidReplyMessageType() {
777         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
778         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
779
780         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
781                 TestModel.JUNK_QNAME.getLocalName());
782
783         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
784                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
785
786         expectBatchedModificationsReady(actorRef2);
787
788         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
789
790         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
791         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
792
793         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
794
795         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
796
797         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
798                 IllegalArgumentException.class);
799     }
800
801     @Test
802     public void testGetIdentifier() {
803         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
804         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
805
806         Object id = transactionProxy.getIdentifier();
807         assertNotNull("getIdentifier returned null", id);
808         assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
809     }
810
811     @Test
812     public void testClose() {
813         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
814
815         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
816                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
817
818         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
819
820         transactionProxy.read(TestModel.TEST_PATH);
821
822         transactionProxy.close();
823
824         verify(mockActorContext).sendOperationAsync(
825                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
826     }
827
828     private interface TransactionProxyOperation {
829         void run(TransactionProxy transactionProxy);
830     }
831
832     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
833         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
834     }
835
836     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
837         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
838                 dataTree);
839     }
840
841     private void throttleOperation(final TransactionProxyOperation operation) {
842         throttleOperation(operation, 1, true);
843     }
844
845     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
846             final boolean shardFound) {
847         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
848                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
849     }
850
851     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
852             final boolean shardFound, final long expectedCompletionTime) {
853         ActorSystem actorSystem = getSystem();
854         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
855
856         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
857         // we now allow one extra permit to be allowed for ready
858         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
859                 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
860                         .getDatastoreContext();
861
862         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
863                 .actorSelection(shardActorRef.path().toString());
864
865         if (shardFound) {
866             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
867                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
868             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
869                     .findPrimaryShardAsync(eq("cars"));
870
871         } else {
872             doReturn(Futures.failed(new Exception("not found")))
873                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
874         }
875
876         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
877                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
878                 any(Timeout.class));
879
880         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
881
882         long start = System.nanoTime();
883
884         operation.run(transactionProxy);
885
886         long end = System.nanoTime();
887
888         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
889                 expectedCompletionTime, end - start),
890                 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
891
892     }
893
894     private void completeOperation(final TransactionProxyOperation operation) {
895         completeOperation(operation, true);
896     }
897
898     private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
899         ActorSystem actorSystem = getSystem();
900         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
901
902         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
903                 .actorSelection(shardActorRef.path().toString());
904
905         if (shardFound) {
906             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
907                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
908         } else {
909             doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
910                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
911         }
912
913         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
914         String actorPath = txActorRef.path().toString();
915         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
916                 DataStoreVersions.CURRENT_VERSION);
917
918         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
919
920         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
921                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
922                 any(Timeout.class));
923
924         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
925
926         long start = System.nanoTime();
927
928         operation.run(transactionProxy);
929
930         long end = System.nanoTime();
931
932         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
933                 .getOperationTimeoutInMillis());
934         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
935                 expected, end - start), end - start <= expected);
936     }
937
938     private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
939         ActorSystem actorSystem = getSystem();
940         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
941
942         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
943                 .actorSelection(shardActorRef.path().toString());
944
945         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
946                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
947
948         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
949
950         long start = System.nanoTime();
951
952         operation.run(transactionProxy);
953
954         long end = System.nanoTime();
955
956         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
957                 .getOperationTimeoutInMillis());
958         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
959                 end - start <= expected);
960     }
961
962     private static DataTree createDataTree() {
963         DataTree dataTree = mock(DataTree.class);
964         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
965         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
966
967         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
968         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
969
970         return dataTree;
971     }
972
973     private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
974         DataTree dataTree = mock(DataTree.class);
975         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
976         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
977
978         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
979         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
980         doReturn(java.util.Optional.of(readResponse)).when(dataTreeModification).readNode(
981             any(YangInstanceIdentifier.class));
982
983         return dataTree;
984     }
985
986
987     @Test
988     public void testWriteCompletionForLocalShard() {
989         completeOperationLocal(transactionProxy -> {
990             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
991
992             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
993
994             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
995
996         }, createDataTree());
997     }
998
999     @Test
1000     public void testWriteThrottlingWhenShardFound() {
1001         throttleOperation(transactionProxy -> {
1002             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1003
1004             expectIncompleteBatchedModifications();
1005
1006             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1007
1008             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1009         });
1010     }
1011
1012     @Test
1013     public void testWriteThrottlingWhenShardNotFound() {
1014         // Confirm that there is no throttling when the Shard is not found
1015         completeOperation(transactionProxy -> {
1016             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1017
1018             expectBatchedModifications(2);
1019
1020             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1021
1022             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1023         }, false);
1024
1025     }
1026
1027
1028     @Test
1029     public void testWriteCompletion() {
1030         completeOperation(transactionProxy -> {
1031             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1032
1033             expectBatchedModifications(2);
1034
1035             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1036
1037             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1038         });
1039     }
1040
1041     @Test
1042     public void testMergeThrottlingWhenShardFound() {
1043         throttleOperation(transactionProxy -> {
1044             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1045
1046             expectIncompleteBatchedModifications();
1047
1048             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1049
1050             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1051         });
1052     }
1053
1054     @Test
1055     public void testMergeThrottlingWhenShardNotFound() {
1056         completeOperation(transactionProxy -> {
1057             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1058
1059             expectBatchedModifications(2);
1060
1061             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1062
1063             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1064         }, false);
1065     }
1066
1067     @Test
1068     public void testMergeCompletion() {
1069         completeOperation(transactionProxy -> {
1070             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1071
1072             expectBatchedModifications(2);
1073
1074             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1075
1076             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1077         });
1078
1079     }
1080
1081     @Test
1082     public void testMergeCompletionForLocalShard() {
1083         completeOperationLocal(transactionProxy -> {
1084             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1085
1086             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1087
1088             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1089
1090         }, createDataTree());
1091     }
1092
1093
1094     @Test
1095     public void testDeleteThrottlingWhenShardFound() {
1096
1097         throttleOperation(transactionProxy -> {
1098             expectIncompleteBatchedModifications();
1099
1100             transactionProxy.delete(TestModel.TEST_PATH);
1101
1102             transactionProxy.delete(TestModel.TEST_PATH);
1103         });
1104     }
1105
1106
1107     @Test
1108     public void testDeleteThrottlingWhenShardNotFound() {
1109
1110         completeOperation(transactionProxy -> {
1111             expectBatchedModifications(2);
1112
1113             transactionProxy.delete(TestModel.TEST_PATH);
1114
1115             transactionProxy.delete(TestModel.TEST_PATH);
1116         }, false);
1117     }
1118
1119     @Test
1120     public void testDeleteCompletionForLocalShard() {
1121         completeOperationLocal(transactionProxy -> {
1122
1123             transactionProxy.delete(TestModel.TEST_PATH);
1124
1125             transactionProxy.delete(TestModel.TEST_PATH);
1126         }, createDataTree());
1127
1128     }
1129
1130     @Test
1131     public void testDeleteCompletion() {
1132         completeOperation(transactionProxy -> {
1133             expectBatchedModifications(2);
1134
1135             transactionProxy.delete(TestModel.TEST_PATH);
1136
1137             transactionProxy.delete(TestModel.TEST_PATH);
1138         });
1139
1140     }
1141
1142     @Test
1143     public void testReadThrottlingWhenShardFound() {
1144
1145         throttleOperation(transactionProxy -> {
1146             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1147                     any(ActorSelection.class), eqReadData());
1148
1149             transactionProxy.read(TestModel.TEST_PATH);
1150
1151             transactionProxy.read(TestModel.TEST_PATH);
1152         });
1153     }
1154
1155     @Test
1156     public void testReadThrottlingWhenShardNotFound() {
1157
1158         completeOperation(transactionProxy -> {
1159             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1160                     any(ActorSelection.class), eqReadData());
1161
1162             transactionProxy.read(TestModel.TEST_PATH);
1163
1164             transactionProxy.read(TestModel.TEST_PATH);
1165         }, false);
1166     }
1167
1168
1169     @Test
1170     public void testReadCompletion() {
1171         completeOperation(transactionProxy -> {
1172             NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1173
1174             doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1175                     any(ActorSelection.class), eqReadData(), any(Timeout.class));
1176
1177             transactionProxy.read(TestModel.TEST_PATH);
1178
1179             transactionProxy.read(TestModel.TEST_PATH);
1180         });
1181
1182     }
1183
1184     @Test
1185     public void testReadCompletionForLocalShard() {
1186         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1187         completeOperationLocal(transactionProxy -> {
1188             transactionProxy.read(TestModel.TEST_PATH);
1189
1190             transactionProxy.read(TestModel.TEST_PATH);
1191         }, createDataTree(nodeToRead));
1192
1193     }
1194
1195     @Test
1196     public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1197         completeOperationLocal(transactionProxy -> {
1198             transactionProxy.read(TestModel.TEST_PATH);
1199
1200             transactionProxy.read(TestModel.TEST_PATH);
1201         }, createDataTree());
1202
1203     }
1204
1205     @Test
1206     public void testExistsThrottlingWhenShardFound() {
1207
1208         throttleOperation(transactionProxy -> {
1209             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1210                     any(ActorSelection.class), eqDataExists());
1211
1212             transactionProxy.exists(TestModel.TEST_PATH);
1213
1214             transactionProxy.exists(TestModel.TEST_PATH);
1215         });
1216     }
1217
1218     @Test
1219     public void testExistsThrottlingWhenShardNotFound() {
1220
1221         completeOperation(transactionProxy -> {
1222             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1223                     any(ActorSelection.class), eqDataExists());
1224
1225             transactionProxy.exists(TestModel.TEST_PATH);
1226
1227             transactionProxy.exists(TestModel.TEST_PATH);
1228         }, false);
1229     }
1230
1231
1232     @Test
1233     public void testExistsCompletion() {
1234         completeOperation(transactionProxy -> {
1235             doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1236                     any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1237
1238             transactionProxy.exists(TestModel.TEST_PATH);
1239
1240             transactionProxy.exists(TestModel.TEST_PATH);
1241         });
1242
1243     }
1244
1245     @Test
1246     public void testExistsCompletionForLocalShard() {
1247         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1248         completeOperationLocal(transactionProxy -> {
1249             transactionProxy.exists(TestModel.TEST_PATH);
1250
1251             transactionProxy.exists(TestModel.TEST_PATH);
1252         }, createDataTree(nodeToRead));
1253
1254     }
1255
1256     @Test
1257     public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1258         completeOperationLocal(transactionProxy -> {
1259             transactionProxy.exists(TestModel.TEST_PATH);
1260
1261             transactionProxy.exists(TestModel.TEST_PATH);
1262         }, createDataTree());
1263
1264     }
1265
1266     @Test
1267     public void testReadyThrottling() {
1268
1269         throttleOperation(transactionProxy -> {
1270             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1271
1272             expectBatchedModifications(1);
1273
1274             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1275
1276             transactionProxy.ready();
1277         });
1278     }
1279
1280     @Test
1281     public void testReadyThrottlingWithTwoTransactionContexts() {
1282         throttleOperation(transactionProxy -> {
1283             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1284             NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1285
1286             expectBatchedModifications(2);
1287
1288             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1289
1290             // Trying to write to Cars will cause another transaction context to get created
1291             transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1292
1293             // Now ready should block for both transaction contexts
1294             transactionProxy.ready();
1295         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1296                 .getOperationTimeoutInMillis()) * 2);
1297     }
1298
1299     private void testModificationOperationBatching(final TransactionType type) {
1300         int shardBatchedModificationCount = 3;
1301         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1302
1303         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1304
1305         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1306
1307         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1308         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1309
1310         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1311         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1312
1313         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1314         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1315
1316         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1317         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1318
1319         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1320         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1321
1322         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1323         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1324
1325         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1326         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1327
1328         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1329
1330         transactionProxy.write(writePath1, writeNode1);
1331         transactionProxy.write(writePath2, writeNode2);
1332         transactionProxy.delete(deletePath1);
1333         transactionProxy.merge(mergePath1, mergeNode1);
1334         transactionProxy.merge(mergePath2, mergeNode2);
1335         transactionProxy.write(writePath3, writeNode3);
1336         transactionProxy.merge(mergePath3, mergeNode3);
1337         transactionProxy.delete(deletePath2);
1338
1339         // This sends the last batch.
1340         transactionProxy.ready();
1341
1342         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1343         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1344
1345         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1346                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1347
1348         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1349                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1350
1351         verifyBatchedModifications(batchedModifications.get(2), true, true,
1352                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1353
1354         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1355     }
1356
1357     @Test
1358     public void testReadWriteModificationOperationBatching() {
1359         testModificationOperationBatching(READ_WRITE);
1360     }
1361
1362     @Test
1363     public void testWriteOnlyModificationOperationBatching() {
1364         testModificationOperationBatching(WRITE_ONLY);
1365     }
1366
1367     @Test
1368     public void testOptimizedWriteOnlyModificationOperationBatching() {
1369         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1370         testModificationOperationBatching(WRITE_ONLY);
1371     }
1372
1373     @Test
1374     public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1375
1376         int shardBatchedModificationCount = 10;
1377         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1378
1379         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1380
1381         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1382
1383         final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1384         final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1385
1386         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1387         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1388
1389         final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1390         final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1391
1392         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1393         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1394
1395         final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1396
1397         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1398                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1399
1400         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1401                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1402
1403         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1404                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1405
1406         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1407
1408         transactionProxy.write(writePath1, writeNode1);
1409         transactionProxy.write(writePath2, writeNode2);
1410
1411         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1412
1413         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1414         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1415
1416         transactionProxy.merge(mergePath1, mergeNode1);
1417         transactionProxy.merge(mergePath2, mergeNode2);
1418
1419         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1420
1421         transactionProxy.delete(deletePath);
1422
1423         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
1424         assertEquals("Exists response", Boolean.TRUE, exists);
1425
1426         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1427         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1428
1429         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1430         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1431
1432         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1433                 new WriteModification(writePath2, writeNode2));
1434
1435         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1436                 new MergeModification(mergePath2, mergeNode2));
1437
1438         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1439
1440         InOrder inOrder = Mockito.inOrder(mockActorContext);
1441         inOrder.verify(mockActorContext).executeOperationAsync(
1442                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1443
1444         inOrder.verify(mockActorContext).executeOperationAsync(
1445                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1446
1447         inOrder.verify(mockActorContext).executeOperationAsync(
1448                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1449
1450         inOrder.verify(mockActorContext).executeOperationAsync(
1451                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1452
1453         inOrder.verify(mockActorContext).executeOperationAsync(
1454                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1455
1456         inOrder.verify(mockActorContext).executeOperationAsync(
1457                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1458     }
1459
1460     @Test
1461     public void testReadRoot() throws InterruptedException, ExecutionException,
1462             java.util.concurrent.TimeoutException {
1463         SchemaContext schemaContext = SchemaContextHelper.full();
1464         Configuration configuration = mock(Configuration.class);
1465         doReturn(configuration).when(mockActorContext).getConfiguration();
1466         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1467         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1468
1469         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1470         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1471
1472         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1473         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1474
1475         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1476
1477         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1478
1479         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1480
1481         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1482                 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1483
1484         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1485
1486         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1487
1488         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1489
1490         @SuppressWarnings("unchecked")
1491         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1492
1493         for (NormalizedNode<?,?> node : collection) {
1494             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1495         }
1496
1497         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1498                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1499
1500         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1501
1502         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1503                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1504
1505         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1506     }
1507
1508
1509     private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1510         ActorSystem actorSystem = getSystem();
1511         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1512
1513         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1514                 .actorSelection(shardActorRef.path().toString());
1515
1516         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1517                 .findPrimaryShardAsync(eq(shardName));
1518
1519         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1520
1521         doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1522                 .actorSelection(txActorRef.path().toString());
1523
1524         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1525                 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1526                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1527
1528         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1529                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));
1530     }
1531 }