Migrate from YangInstanceIdentifier.EMPTY
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.ArgumentMatchers.anyString;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.ArgumentMatchers.isA;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.doThrow;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
26
27 import akka.actor.ActorRef;
28 import akka.actor.ActorSelection;
29 import akka.actor.ActorSystem;
30 import akka.actor.Props;
31 import akka.dispatch.Futures;
32 import akka.util.Timeout;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.ImmutableSortedSet;
35 import com.google.common.collect.Sets;
36 import com.google.common.util.concurrent.FluentFuture;
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.MoreExecutors;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.util.Collection;
41 import java.util.List;
42 import java.util.Optional;
43 import java.util.SortedSet;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Assert;
49 import org.junit.Test;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.InOrder;
52 import org.mockito.Mockito;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.datastore.config.Configuration;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
58 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
60 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
65 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
66 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
69 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
70 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
71 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
72 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
73 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
74 import org.opendaylight.mdsal.common.api.ReadFailedException;
75 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
77 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
81 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
82 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
83 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
84 import scala.concurrent.Promise;
85
86 @SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
87 public class TransactionProxyTest extends AbstractTransactionProxyTest {
88
89     @SuppressWarnings("serial")
90     static class TestException extends RuntimeException {
91     }
92
93     interface Invoker {
94         FluentFuture<?> invoke(TransactionProxy proxy);
95     }
96
97     @Test
98     public void testRead() throws Exception {
99         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
100
101         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
102
103         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
104                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
105
106         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
107                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108
109         assertFalse("NormalizedNode isPresent", readOptional.isPresent());
110
111         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
112
113         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
114                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
115
116         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
117
118         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
119
120         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
121     }
122
123     @Test(expected = ReadFailedException.class)
124     public void testReadWithInvalidReplyMessageType() throws Throwable {
125         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
126
127         doReturn(Futures.successful(new Object())).when(mockActorContext)
128                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
129
130         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
131
132         try {
133             transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
134         } catch (ExecutionException e) {
135             throw e.getCause();
136         }
137     }
138
139     @Test(expected = TestException.class)
140     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
141         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
142
143         doReturn(Futures.failed(new TestException())).when(mockActorContext)
144                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
145
146         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
147
148         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
149     }
150
151     private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
152             throws Throwable {
153         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
154
155         if (exToThrow instanceof PrimaryNotFoundException) {
156             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
157         } else {
158             doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
159                     .findPrimaryShardAsync(anyString());
160         }
161
162         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
163                 any(ActorSelection.class), any(), any(Timeout.class));
164
165         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
166
167         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
168     }
169
170     private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
171         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
172     }
173
174     @Test(expected = PrimaryNotFoundException.class)
175     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
176         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
177     }
178
179     @Test(expected = TestException.class)
180     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
181         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
182                 new TestException()));
183     }
184
185     @Test(expected = TestException.class)
186     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
187         testReadWithExceptionOnInitialCreateTransaction(new TestException());
188     }
189
190     @Test
191     public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
192         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
193
194         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
195
196         expectBatchedModifications(actorRef, 1);
197
198         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
199                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
200
201         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
202
203         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
204
205         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
206                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
207
208         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
209         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
210
211         InOrder inOrder = Mockito.inOrder(mockActorContext);
212         inOrder.verify(mockActorContext).executeOperationAsync(
213                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
214
215         inOrder.verify(mockActorContext).executeOperationAsync(
216                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
217     }
218
219     @Test(expected = IllegalStateException.class)
220     public void testReadPreConditionCheck() {
221         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
222         transactionProxy.read(TestModel.TEST_PATH);
223     }
224
225     @Test(expected = IllegalArgumentException.class)
226     public void testInvalidCreateTransactionReply() throws Throwable {
227         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
228
229         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
230                 .actorSelection(actorRef.path().toString());
231
232         doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
233                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
234
235         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
236             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
237             any(Timeout.class));
238
239         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
240
241         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
242     }
243
244     @Test
245     public void testExists() throws Exception {
246         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
247
248         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
249
250         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
251                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
252
253         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
254
255         assertEquals("Exists response", Boolean.FALSE, exists);
256
257         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
258                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
259
260         exists = transactionProxy.exists(TestModel.TEST_PATH).get();
261
262         assertEquals("Exists response", Boolean.TRUE, exists);
263     }
264
265     @Test(expected = PrimaryNotFoundException.class)
266     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
267         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
268             proxy -> proxy.exists(TestModel.TEST_PATH));
269     }
270
271     @Test(expected = ReadFailedException.class)
272     public void testExistsWithInvalidReplyMessageType() throws Throwable {
273         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
274
275         doReturn(Futures.successful(new Object())).when(mockActorContext)
276                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
277
278         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
279
280         try {
281             transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
282         } catch (ExecutionException e) {
283             throw e.getCause();
284         }
285     }
286
287     @Test(expected = TestException.class)
288     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
289         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
290
291         doReturn(Futures.failed(new TestException())).when(mockActorContext)
292                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
293
294         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
295
296         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
297     }
298
299     @Test
300     public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
301         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
302
303         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
304
305         expectBatchedModifications(actorRef, 1);
306
307         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
308                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
309
310         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
311
312         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
313
314         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
315
316         assertEquals("Exists response", Boolean.TRUE, exists);
317
318         InOrder inOrder = Mockito.inOrder(mockActorContext);
319         inOrder.verify(mockActorContext).executeOperationAsync(
320                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
321
322         inOrder.verify(mockActorContext).executeOperationAsync(
323                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
324     }
325
326     @Test(expected = IllegalStateException.class)
327     public void testExistsPreConditionCheck() {
328         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
329         transactionProxy.exists(TestModel.TEST_PATH);
330     }
331
332     @Test
333     public void testWrite() {
334         dataStoreContextBuilder.shardBatchedModificationCount(1);
335         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
336
337         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
338
339         expectBatchedModifications(actorRef, 1);
340
341         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
342
343         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
344
345         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
346     }
347
348     @Test
349     @SuppressWarnings("checkstyle:IllegalCatch")
350     public void testWriteAfterAsyncRead() throws Exception {
351         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
352                 DefaultShardStrategy.DEFAULT_SHARD);
353
354         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
355         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
356                 eq(getSystem().actorSelection(actorRef.path())),
357                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
358
359         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
360                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
361
362         expectBatchedModificationsReady(actorRef);
363
364         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
365
366         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
367
368         final CountDownLatch readComplete = new CountDownLatch(1);
369         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
370         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
371                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
372                     @Override
373                     public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
374                         try {
375                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
376                         } catch (Exception e) {
377                             caughtEx.set(e);
378                         } finally {
379                             readComplete.countDown();
380                         }
381                     }
382
383                     @Override
384                     public void onFailure(final Throwable failure) {
385                         caughtEx.set(failure);
386                         readComplete.countDown();
387                     }
388                 }, MoreExecutors.directExecutor());
389
390         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
391
392         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
393
394         final Throwable t = caughtEx.get();
395         if (t != null) {
396             Throwables.propagateIfPossible(t, Exception.class);
397             throw new RuntimeException(t);
398         }
399
400         // This sends the batched modification.
401         transactionProxy.ready();
402
403         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
404     }
405
406     @Test(expected = IllegalStateException.class)
407     public void testWritePreConditionCheck() {
408         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
409         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
410     }
411
412     @Test(expected = IllegalStateException.class)
413     public void testWriteAfterReadyPreConditionCheck() {
414         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
415
416         transactionProxy.ready();
417
418         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
419     }
420
421     @Test
422     public void testMerge() {
423         dataStoreContextBuilder.shardBatchedModificationCount(1);
424         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
425
426         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
427
428         expectBatchedModifications(actorRef, 1);
429
430         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
431
432         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
433
434         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
435     }
436
437     @Test
438     public void testDelete() {
439         dataStoreContextBuilder.shardBatchedModificationCount(1);
440         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
441
442         expectBatchedModifications(actorRef, 1);
443
444         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
445
446         transactionProxy.delete(TestModel.TEST_PATH);
447
448         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
449     }
450
451     @Test
452     public void testReadWrite() {
453         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
454
455         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
456
457         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
458                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
459
460         expectBatchedModifications(actorRef, 1);
461
462         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
463
464         transactionProxy.read(TestModel.TEST_PATH);
465
466         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
467
468         transactionProxy.read(TestModel.TEST_PATH);
469
470         transactionProxy.read(TestModel.TEST_PATH);
471
472         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
473         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
474
475         verifyBatchedModifications(batchedModifications.get(0), false,
476                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
477     }
478
479     @Test
480     public void testReadyWithReadWrite() {
481         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
482
483         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
484
485         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
486                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
487
488         expectBatchedModificationsReady(actorRef, true);
489
490         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
491
492         transactionProxy.read(TestModel.TEST_PATH);
493
494         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
495
496         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
497
498         assertTrue(ready instanceof SingleCommitCohortProxy);
499
500         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
501
502         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
503         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
504
505         verifyBatchedModifications(batchedModifications.get(0), true, true,
506                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
507
508         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
509     }
510
511     @Test
512     public void testReadyWithNoModifications() {
513         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
514
515         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
516                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
517
518         expectBatchedModificationsReady(actorRef, true);
519
520         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
521
522         transactionProxy.read(TestModel.TEST_PATH);
523
524         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
525
526         assertTrue(ready instanceof SingleCommitCohortProxy);
527
528         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
529
530         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
531         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
532
533         verifyBatchedModifications(batchedModifications.get(0), true, true);
534     }
535
536     @Test
537     public void testReadyWithMultipleShardWrites() {
538         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
539
540         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
541                 TestModel.JUNK_QNAME.getLocalName());
542
543         expectBatchedModificationsReady(actorRef1);
544         expectBatchedModificationsReady(actorRef2);
545
546         ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
547
548         doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
549                 .actorSelection(actorRef3.path().toString());
550
551         doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
552                 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
553
554         expectReadyLocalTransaction(actorRef3, false);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
559         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
560         transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
561
562         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
563
564         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
565
566         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
567                 actorSelection(actorRef2), actorSelection(actorRef3));
568
569         SortedSet<String> expShardNames =
570                 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
571                         TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
572
573         ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
574         verify(mockActorContext).executeOperationAsync(
575                 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
576         assertTrue("Participating shards present", batchedMods.getValue().getParticipatingShardNames().isPresent());
577         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
578
579         batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
580         verify(mockActorContext).executeOperationAsync(
581                 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
582         assertTrue("Participating shards present", batchedMods.getValue().getParticipatingShardNames().isPresent());
583         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
584
585         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
586         verify(mockActorContext).executeOperationAsync(
587                 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
588         assertTrue("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
589         assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
590     }
591
592     @Test
593     public void testReadyWithWriteOnlyAndLastBatchPending() {
594         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
595
596         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
597
598         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
599
600         expectBatchedModificationsReady(actorRef, true);
601
602         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
603
604         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
605
606         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
607
608         assertTrue(ready instanceof SingleCommitCohortProxy);
609
610         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
611
612         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
613         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
614
615         verifyBatchedModifications(batchedModifications.get(0), true, true,
616                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
617     }
618
619     @Test
620     public void testReadyWithWriteOnlyAndLastBatchEmpty() {
621         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
622         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
623
624         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
625
626         expectBatchedModificationsReady(actorRef, true);
627
628         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
629
630         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
631
632         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
633
634         assertTrue(ready instanceof SingleCommitCohortProxy);
635
636         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
637
638         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
639         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
640
641         verifyBatchedModifications(batchedModifications.get(0), false,
642                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
643
644         verifyBatchedModifications(batchedModifications.get(1), true, true);
645     }
646
647     @Test
648     public void testReadyWithReplyFailure() {
649         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
650
651         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
652
653         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
654
655         expectFailedBatchedModifications(actorRef);
656
657         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
658
659         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
660
661         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
662
663         assertTrue(ready instanceof SingleCommitCohortProxy);
664
665         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
666     }
667
668     @Test
669     public void testReadyWithDebugContextEnabled() {
670         dataStoreContextBuilder.transactionDebugContextEnabled(true);
671
672         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
673
674         expectBatchedModificationsReady(actorRef, true);
675
676         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
677
678         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
679
680         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
681
682         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
683
684         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
685     }
686
687     @Test
688     public void testReadyWithLocalTransaction() {
689         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
690
691         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
692                 .actorSelection(shardActorRef.path().toString());
693
694         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
695                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
696
697         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
698
699         expectReadyLocalTransaction(shardActorRef, true);
700
701         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
702         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
703
704         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
705         assertTrue(ready instanceof SingleCommitCohortProxy);
706         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
707
708         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
709         verify(mockActorContext).executeOperationAsync(
710                 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
711         assertFalse("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
712     }
713
714     @Test
715     public void testReadyWithLocalTransactionWithFailure() {
716         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
717
718         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
719                 .actorSelection(shardActorRef.path().toString());
720
721         DataTree mockDataTree = createDataTree();
722         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
723         doThrow(new RuntimeException("mock")).when(mockModification).ready();
724
725         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
726                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
727
728         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
729
730         expectReadyLocalTransaction(shardActorRef, true);
731
732         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
733         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
734
735         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
736         assertTrue(ready instanceof SingleCommitCohortProxy);
737         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
738     }
739
740     private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
741         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
742
743         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
744
745         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
746
747         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
748
749         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
750
751         transactionProxy.delete(TestModel.TEST_PATH);
752
753         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
754
755         assertTrue(ready instanceof SingleCommitCohortProxy);
756
757         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
758     }
759
760     @Test
761     public void testWriteOnlyTxWithPrimaryNotFoundException() {
762         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
763     }
764
765     @Test
766     public void testWriteOnlyTxWithNotInitializedException() {
767         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
768     }
769
770     @Test
771     public void testWriteOnlyTxWithNoShardLeaderException() {
772         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
773     }
774
775     @Test
776     public void testReadyWithInvalidReplyMessageType() {
777         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
778         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
779
780         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
781                 TestModel.JUNK_QNAME.getLocalName());
782
783         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
784                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
785
786         expectBatchedModificationsReady(actorRef2);
787
788         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
789
790         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
791         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
792
793         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
794
795         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
796
797         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
798                 IllegalArgumentException.class);
799     }
800
801     @Test
802     public void testGetIdentifier() {
803         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
804         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
805
806         Object id = transactionProxy.getIdentifier();
807         assertNotNull("getIdentifier returned null", id);
808         assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
809     }
810
811     @Test
812     public void testClose() {
813         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
814
815         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
816                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
817
818         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
819
820         transactionProxy.read(TestModel.TEST_PATH);
821
822         transactionProxy.close();
823
824         verify(mockActorContext).sendOperationAsync(
825                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
826     }
827
828     private interface TransactionProxyOperation {
829         void run(TransactionProxy transactionProxy);
830     }
831
832     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
833         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
834     }
835
836     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
837         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
838                 dataTree);
839     }
840
841     private void throttleOperation(final TransactionProxyOperation operation) {
842         throttleOperation(operation, 1, true);
843     }
844
845     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
846             final boolean shardFound) {
847         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
848                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
849     }
850
851     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
852             final boolean shardFound, final long expectedCompletionTime) {
853         ActorSystem actorSystem = getSystem();
854         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
855
856         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
857         // we now allow one extra permit to be allowed for ready
858         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
859                 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
860                         .getDatastoreContext();
861
862         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
863                 .actorSelection(shardActorRef.path().toString());
864
865         if (shardFound) {
866             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
867                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
868             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
869                     .findPrimaryShardAsync(eq("cars"));
870
871         } else {
872             doReturn(Futures.failed(new Exception("not found")))
873                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
874         }
875
876         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
877                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
878                 any(Timeout.class));
879
880         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
881
882         long start = System.nanoTime();
883
884         operation.run(transactionProxy);
885
886         long end = System.nanoTime();
887
888         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
889                 expectedCompletionTime, end - start),
890                 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
891
892     }
893
894     private void completeOperation(final TransactionProxyOperation operation) {
895         completeOperation(operation, true);
896     }
897
898     private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
899         ActorSystem actorSystem = getSystem();
900         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
901
902         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
903                 .actorSelection(shardActorRef.path().toString());
904
905         if (shardFound) {
906             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
907                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
908         } else {
909             doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
910                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
911         }
912
913         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
914         String actorPath = txActorRef.path().toString();
915         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
916                 DataStoreVersions.CURRENT_VERSION);
917
918         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
919
920         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
921                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
922                 any(Timeout.class));
923
924         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
925
926         long start = System.nanoTime();
927
928         operation.run(transactionProxy);
929
930         long end = System.nanoTime();
931
932         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
933                 .getOperationTimeoutInMillis());
934         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
935                 expected, end - start), end - start <= expected);
936     }
937
938     private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
939         ActorSystem actorSystem = getSystem();
940         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
941
942         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
943                 .actorSelection(shardActorRef.path().toString());
944
945         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
946                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
947
948         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
949
950         long start = System.nanoTime();
951
952         operation.run(transactionProxy);
953
954         long end = System.nanoTime();
955
956         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
957                 .getOperationTimeoutInMillis());
958         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
959                 end - start <= expected);
960     }
961
962     private static DataTree createDataTree() {
963         DataTree dataTree = mock(DataTree.class);
964         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
965         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
966
967         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
968         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
969
970         return dataTree;
971     }
972
973     private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
974         DataTree dataTree = mock(DataTree.class);
975         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
976         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
977
978         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
979         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
980         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
981
982         return dataTree;
983     }
984
985
986     @Test
987     public void testWriteCompletionForLocalShard() {
988         completeOperationLocal(transactionProxy -> {
989             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
990
991             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
992
993             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
994
995         }, createDataTree());
996     }
997
998     @Test
999     public void testWriteThrottlingWhenShardFound() {
1000         throttleOperation(transactionProxy -> {
1001             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1002
1003             expectIncompleteBatchedModifications();
1004
1005             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1006
1007             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1008         });
1009     }
1010
1011     @Test
1012     public void testWriteThrottlingWhenShardNotFound() {
1013         // Confirm that there is no throttling when the Shard is not found
1014         completeOperation(transactionProxy -> {
1015             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1016
1017             expectBatchedModifications(2);
1018
1019             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1020
1021             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1022         }, false);
1023
1024     }
1025
1026
1027     @Test
1028     public void testWriteCompletion() {
1029         completeOperation(transactionProxy -> {
1030             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1031
1032             expectBatchedModifications(2);
1033
1034             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1035
1036             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1037         });
1038     }
1039
1040     @Test
1041     public void testMergeThrottlingWhenShardFound() {
1042         throttleOperation(transactionProxy -> {
1043             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1044
1045             expectIncompleteBatchedModifications();
1046
1047             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1048
1049             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1050         });
1051     }
1052
1053     @Test
1054     public void testMergeThrottlingWhenShardNotFound() {
1055         completeOperation(transactionProxy -> {
1056             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1057
1058             expectBatchedModifications(2);
1059
1060             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1061
1062             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1063         }, false);
1064     }
1065
1066     @Test
1067     public void testMergeCompletion() {
1068         completeOperation(transactionProxy -> {
1069             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1070
1071             expectBatchedModifications(2);
1072
1073             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1074
1075             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1076         });
1077
1078     }
1079
1080     @Test
1081     public void testMergeCompletionForLocalShard() {
1082         completeOperationLocal(transactionProxy -> {
1083             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1084
1085             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1086
1087             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1088
1089         }, createDataTree());
1090     }
1091
1092
1093     @Test
1094     public void testDeleteThrottlingWhenShardFound() {
1095
1096         throttleOperation(transactionProxy -> {
1097             expectIncompleteBatchedModifications();
1098
1099             transactionProxy.delete(TestModel.TEST_PATH);
1100
1101             transactionProxy.delete(TestModel.TEST_PATH);
1102         });
1103     }
1104
1105
1106     @Test
1107     public void testDeleteThrottlingWhenShardNotFound() {
1108
1109         completeOperation(transactionProxy -> {
1110             expectBatchedModifications(2);
1111
1112             transactionProxy.delete(TestModel.TEST_PATH);
1113
1114             transactionProxy.delete(TestModel.TEST_PATH);
1115         }, false);
1116     }
1117
1118     @Test
1119     public void testDeleteCompletionForLocalShard() {
1120         completeOperationLocal(transactionProxy -> {
1121
1122             transactionProxy.delete(TestModel.TEST_PATH);
1123
1124             transactionProxy.delete(TestModel.TEST_PATH);
1125         }, createDataTree());
1126
1127     }
1128
1129     @Test
1130     public void testDeleteCompletion() {
1131         completeOperation(transactionProxy -> {
1132             expectBatchedModifications(2);
1133
1134             transactionProxy.delete(TestModel.TEST_PATH);
1135
1136             transactionProxy.delete(TestModel.TEST_PATH);
1137         });
1138
1139     }
1140
1141     @Test
1142     public void testReadThrottlingWhenShardFound() {
1143
1144         throttleOperation(transactionProxy -> {
1145             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1146                     any(ActorSelection.class), eqReadData());
1147
1148             transactionProxy.read(TestModel.TEST_PATH);
1149
1150             transactionProxy.read(TestModel.TEST_PATH);
1151         });
1152     }
1153
1154     @Test
1155     public void testReadThrottlingWhenShardNotFound() {
1156
1157         completeOperation(transactionProxy -> {
1158             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1159                     any(ActorSelection.class), eqReadData());
1160
1161             transactionProxy.read(TestModel.TEST_PATH);
1162
1163             transactionProxy.read(TestModel.TEST_PATH);
1164         }, false);
1165     }
1166
1167
1168     @Test
1169     public void testReadCompletion() {
1170         completeOperation(transactionProxy -> {
1171             NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1172
1173             doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1174                     any(ActorSelection.class), eqReadData(), any(Timeout.class));
1175
1176             transactionProxy.read(TestModel.TEST_PATH);
1177
1178             transactionProxy.read(TestModel.TEST_PATH);
1179         });
1180
1181     }
1182
1183     @Test
1184     public void testReadCompletionForLocalShard() {
1185         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1186         completeOperationLocal(transactionProxy -> {
1187             transactionProxy.read(TestModel.TEST_PATH);
1188
1189             transactionProxy.read(TestModel.TEST_PATH);
1190         }, createDataTree(nodeToRead));
1191
1192     }
1193
1194     @Test
1195     public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1196         completeOperationLocal(transactionProxy -> {
1197             transactionProxy.read(TestModel.TEST_PATH);
1198
1199             transactionProxy.read(TestModel.TEST_PATH);
1200         }, createDataTree());
1201
1202     }
1203
1204     @Test
1205     public void testExistsThrottlingWhenShardFound() {
1206
1207         throttleOperation(transactionProxy -> {
1208             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1209                     any(ActorSelection.class), eqDataExists());
1210
1211             transactionProxy.exists(TestModel.TEST_PATH);
1212
1213             transactionProxy.exists(TestModel.TEST_PATH);
1214         });
1215     }
1216
1217     @Test
1218     public void testExistsThrottlingWhenShardNotFound() {
1219
1220         completeOperation(transactionProxy -> {
1221             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1222                     any(ActorSelection.class), eqDataExists());
1223
1224             transactionProxy.exists(TestModel.TEST_PATH);
1225
1226             transactionProxy.exists(TestModel.TEST_PATH);
1227         }, false);
1228     }
1229
1230
1231     @Test
1232     public void testExistsCompletion() {
1233         completeOperation(transactionProxy -> {
1234             doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1235                     any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1236
1237             transactionProxy.exists(TestModel.TEST_PATH);
1238
1239             transactionProxy.exists(TestModel.TEST_PATH);
1240         });
1241
1242     }
1243
1244     @Test
1245     public void testExistsCompletionForLocalShard() {
1246         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1247         completeOperationLocal(transactionProxy -> {
1248             transactionProxy.exists(TestModel.TEST_PATH);
1249
1250             transactionProxy.exists(TestModel.TEST_PATH);
1251         }, createDataTree(nodeToRead));
1252
1253     }
1254
1255     @Test
1256     public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1257         completeOperationLocal(transactionProxy -> {
1258             transactionProxy.exists(TestModel.TEST_PATH);
1259
1260             transactionProxy.exists(TestModel.TEST_PATH);
1261         }, createDataTree());
1262
1263     }
1264
1265     @Test
1266     public void testReadyThrottling() {
1267
1268         throttleOperation(transactionProxy -> {
1269             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1270
1271             expectBatchedModifications(1);
1272
1273             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1274
1275             transactionProxy.ready();
1276         });
1277     }
1278
1279     @Test
1280     public void testReadyThrottlingWithTwoTransactionContexts() {
1281         throttleOperation(transactionProxy -> {
1282             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1283             NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1284
1285             expectBatchedModifications(2);
1286
1287             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1288
1289             // Trying to write to Cars will cause another transaction context to get created
1290             transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1291
1292             // Now ready should block for both transaction contexts
1293             transactionProxy.ready();
1294         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1295                 .getOperationTimeoutInMillis()) * 2);
1296     }
1297
1298     private void testModificationOperationBatching(final TransactionType type) {
1299         int shardBatchedModificationCount = 3;
1300         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1301
1302         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1303
1304         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1305
1306         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1307         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1308
1309         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1310         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1311
1312         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1313         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1314
1315         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1316         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1317
1318         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1319         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1320
1321         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1322         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1323
1324         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1325         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1326
1327         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1328
1329         transactionProxy.write(writePath1, writeNode1);
1330         transactionProxy.write(writePath2, writeNode2);
1331         transactionProxy.delete(deletePath1);
1332         transactionProxy.merge(mergePath1, mergeNode1);
1333         transactionProxy.merge(mergePath2, mergeNode2);
1334         transactionProxy.write(writePath3, writeNode3);
1335         transactionProxy.merge(mergePath3, mergeNode3);
1336         transactionProxy.delete(deletePath2);
1337
1338         // This sends the last batch.
1339         transactionProxy.ready();
1340
1341         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1342         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1343
1344         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1345                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1346
1347         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1348                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1349
1350         verifyBatchedModifications(batchedModifications.get(2), true, true,
1351                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1352
1353         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1354     }
1355
1356     @Test
1357     public void testReadWriteModificationOperationBatching() {
1358         testModificationOperationBatching(READ_WRITE);
1359     }
1360
1361     @Test
1362     public void testWriteOnlyModificationOperationBatching() {
1363         testModificationOperationBatching(WRITE_ONLY);
1364     }
1365
1366     @Test
1367     public void testOptimizedWriteOnlyModificationOperationBatching() {
1368         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1369         testModificationOperationBatching(WRITE_ONLY);
1370     }
1371
1372     @Test
1373     public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1374
1375         int shardBatchedModificationCount = 10;
1376         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1377
1378         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1379
1380         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1381
1382         final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1383         final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1384
1385         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1386         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1387
1388         final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1389         final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1390
1391         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1392         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1393
1394         final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1395
1396         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1397                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1398
1399         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1400                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1401
1402         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1403                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1404
1405         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1406
1407         transactionProxy.write(writePath1, writeNode1);
1408         transactionProxy.write(writePath2, writeNode2);
1409
1410         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1411
1412         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1413         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1414
1415         transactionProxy.merge(mergePath1, mergeNode1);
1416         transactionProxy.merge(mergePath2, mergeNode2);
1417
1418         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1419
1420         transactionProxy.delete(deletePath);
1421
1422         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
1423         assertEquals("Exists response", Boolean.TRUE, exists);
1424
1425         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1426         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1427
1428         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1429         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1430
1431         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1432                 new WriteModification(writePath2, writeNode2));
1433
1434         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1435                 new MergeModification(mergePath2, mergeNode2));
1436
1437         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1438
1439         InOrder inOrder = Mockito.inOrder(mockActorContext);
1440         inOrder.verify(mockActorContext).executeOperationAsync(
1441                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1442
1443         inOrder.verify(mockActorContext).executeOperationAsync(
1444                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1445
1446         inOrder.verify(mockActorContext).executeOperationAsync(
1447                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1448
1449         inOrder.verify(mockActorContext).executeOperationAsync(
1450                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1451
1452         inOrder.verify(mockActorContext).executeOperationAsync(
1453                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1454
1455         inOrder.verify(mockActorContext).executeOperationAsync(
1456                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1457     }
1458
1459     @Test
1460     public void testReadRoot() throws InterruptedException, ExecutionException,
1461             java.util.concurrent.TimeoutException {
1462         SchemaContext schemaContext = SchemaContextHelper.full();
1463         Configuration configuration = mock(Configuration.class);
1464         doReturn(configuration).when(mockActorContext).getConfiguration();
1465         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1466         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1467
1468         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1469         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1470
1471         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1472         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1473
1474         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1475
1476         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1477
1478         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1479
1480         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1481                 YangInstanceIdentifier.empty()).get(5, TimeUnit.SECONDS);
1482
1483         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1484
1485         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1486
1487         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1488
1489         @SuppressWarnings("unchecked")
1490         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1491
1492         for (NormalizedNode<?,?> node : collection) {
1493             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1494         }
1495
1496         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1497                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1498
1499         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1500
1501         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1502                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1503
1504         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1505     }
1506
1507
1508     private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1509         ActorSystem actorSystem = getSystem();
1510         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1511
1512         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1513                 .actorSelection(shardActorRef.path().toString());
1514
1515         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1516                 .findPrimaryShardAsync(eq(shardName));
1517
1518         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1519
1520         doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1521                 .actorSelection(txActorRef.path().toString());
1522
1523         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1524                 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1525                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1526
1527         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1528                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.empty()), any(Timeout.class));
1529     }
1530 }