Convert DatastoreSnapshotRestore to OSGi DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.anyString;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.ArgumentMatchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Throwables;
33 import com.google.common.collect.ImmutableSortedSet;
34 import com.google.common.collect.Sets;
35 import com.google.common.util.concurrent.FluentFuture;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.Optional;
42 import java.util.SortedSet;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.datastore.config.Configuration;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
64 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
67 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
68 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
69 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
70 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
71 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
72 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
73 import org.opendaylight.mdsal.common.api.ReadFailedException;
74 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
75 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
76 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
77 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
81 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
82 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
83 import scala.concurrent.Promise;
84
85 @SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
86 public class TransactionProxyTest extends AbstractTransactionProxyTest {
87
88     @SuppressWarnings("serial")
89     static class TestException extends RuntimeException {
90     }
91
92     interface Invoker {
93         FluentFuture<?> invoke(TransactionProxy proxy);
94     }
95
96     @Test
97     public void testRead() throws Exception {
98         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
99
100         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
101
102         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
103                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
104
105         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
106                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
107
108         assertFalse("NormalizedNode isPresent", readOptional.isPresent());
109
110         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
111
112         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
113                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
114
115         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
116
117         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
118
119         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
120     }
121
122     @Test(expected = ReadFailedException.class)
123     public void testReadWithInvalidReplyMessageType() throws Throwable {
124         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
125
126         doReturn(Futures.successful(new Object())).when(mockActorContext)
127                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
128
129         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
130
131         try {
132             transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
133         } catch (ExecutionException e) {
134             throw e.getCause();
135         }
136     }
137
138     @Test(expected = TestException.class)
139     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
140         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
141
142         doReturn(Futures.failed(new TestException())).when(mockActorContext)
143                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
144
145         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
146
147         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
148     }
149
150     private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
151             throws Throwable {
152         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
153
154         if (exToThrow instanceof PrimaryNotFoundException) {
155             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
156         } else {
157             doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
158                     .findPrimaryShardAsync(anyString());
159         }
160
161         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
162                 any(ActorSelection.class), any(), any(Timeout.class));
163
164         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
165
166         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
167     }
168
169     private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
170         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
171     }
172
173     @Test(expected = PrimaryNotFoundException.class)
174     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
175         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
176     }
177
178     @Test(expected = TestException.class)
179     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
180         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
181                 new TestException()));
182     }
183
184     @Test(expected = TestException.class)
185     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
186         testReadWithExceptionOnInitialCreateTransaction(new TestException());
187     }
188
189     @Test
190     public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
191         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
192
193         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
194
195         expectBatchedModifications(actorRef, 1);
196
197         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
198                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
199
200         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
201
202         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
203
204         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
205                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
206
207         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
208         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
209
210         InOrder inOrder = Mockito.inOrder(mockActorContext);
211         inOrder.verify(mockActorContext).executeOperationAsync(
212                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
213
214         inOrder.verify(mockActorContext).executeOperationAsync(
215                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
216     }
217
218     @Test(expected = IllegalStateException.class)
219     public void testReadPreConditionCheck() {
220         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
221         transactionProxy.read(TestModel.TEST_PATH);
222     }
223
224     @Test(expected = IllegalArgumentException.class)
225     public void testInvalidCreateTransactionReply() throws Throwable {
226         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
227
228         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
229                 .actorSelection(actorRef.path().toString());
230
231         doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
232                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
233
234         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
235             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
236             any(Timeout.class));
237
238         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
239
240         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
241     }
242
243     @Test
244     public void testExists() throws Exception {
245         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
246
247         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
248
249         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
250                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
251
252         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
253
254         assertEquals("Exists response", Boolean.FALSE, exists);
255
256         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
257                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
258
259         exists = transactionProxy.exists(TestModel.TEST_PATH).get();
260
261         assertEquals("Exists response", Boolean.TRUE, exists);
262     }
263
264     @Test(expected = PrimaryNotFoundException.class)
265     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
266         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
267             proxy -> proxy.exists(TestModel.TEST_PATH));
268     }
269
270     @Test(expected = ReadFailedException.class)
271     public void testExistsWithInvalidReplyMessageType() throws Throwable {
272         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
273
274         doReturn(Futures.successful(new Object())).when(mockActorContext)
275                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
276
277         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
278
279         try {
280             transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
281         } catch (ExecutionException e) {
282             throw e.getCause();
283         }
284     }
285
286     @Test(expected = TestException.class)
287     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
288         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
289
290         doReturn(Futures.failed(new TestException())).when(mockActorContext)
291                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
292
293         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
294
295         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
296     }
297
298     @Test
299     public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
300         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
301
302         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
303
304         expectBatchedModifications(actorRef, 1);
305
306         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
307                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
308
309         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
310
311         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
312
313         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
314
315         assertEquals("Exists response", Boolean.TRUE, exists);
316
317         InOrder inOrder = Mockito.inOrder(mockActorContext);
318         inOrder.verify(mockActorContext).executeOperationAsync(
319                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
320
321         inOrder.verify(mockActorContext).executeOperationAsync(
322                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
323     }
324
325     @Test(expected = IllegalStateException.class)
326     public void testExistsPreConditionCheck() {
327         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
328         transactionProxy.exists(TestModel.TEST_PATH);
329     }
330
331     @Test
332     public void testWrite() {
333         dataStoreContextBuilder.shardBatchedModificationCount(1);
334         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
335
336         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
337
338         expectBatchedModifications(actorRef, 1);
339
340         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
341
342         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
343
344         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
345     }
346
347     @Test
348     @SuppressWarnings("checkstyle:IllegalCatch")
349     public void testWriteAfterAsyncRead() throws Exception {
350         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
351                 DefaultShardStrategy.DEFAULT_SHARD);
352
353         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
354         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
355                 eq(getSystem().actorSelection(actorRef.path())),
356                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
357
358         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
359                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
360
361         expectBatchedModificationsReady(actorRef);
362
363         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
364
365         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
366
367         final CountDownLatch readComplete = new CountDownLatch(1);
368         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
369         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
370                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
371                     @Override
372                     public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
373                         try {
374                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
375                         } catch (Exception e) {
376                             caughtEx.set(e);
377                         } finally {
378                             readComplete.countDown();
379                         }
380                     }
381
382                     @Override
383                     public void onFailure(final Throwable failure) {
384                         caughtEx.set(failure);
385                         readComplete.countDown();
386                     }
387                 }, MoreExecutors.directExecutor());
388
389         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
390
391         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
392
393         final Throwable t = caughtEx.get();
394         if (t != null) {
395             Throwables.propagateIfPossible(t, Exception.class);
396             throw new RuntimeException(t);
397         }
398
399         // This sends the batched modification.
400         transactionProxy.ready();
401
402         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
403     }
404
405     @Test(expected = IllegalStateException.class)
406     public void testWritePreConditionCheck() {
407         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
408         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
409     }
410
411     @Test(expected = IllegalStateException.class)
412     public void testWriteAfterReadyPreConditionCheck() {
413         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
414
415         transactionProxy.ready();
416
417         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
418     }
419
420     @Test
421     public void testMerge() {
422         dataStoreContextBuilder.shardBatchedModificationCount(1);
423         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
424
425         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
426
427         expectBatchedModifications(actorRef, 1);
428
429         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
430
431         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
432
433         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
434     }
435
436     @Test
437     public void testDelete() {
438         dataStoreContextBuilder.shardBatchedModificationCount(1);
439         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
440
441         expectBatchedModifications(actorRef, 1);
442
443         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
444
445         transactionProxy.delete(TestModel.TEST_PATH);
446
447         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
448     }
449
450     @Test
451     public void testReadWrite() {
452         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
453
454         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
455
456         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
457                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
458
459         expectBatchedModifications(actorRef, 1);
460
461         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
462
463         transactionProxy.read(TestModel.TEST_PATH);
464
465         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
466
467         transactionProxy.read(TestModel.TEST_PATH);
468
469         transactionProxy.read(TestModel.TEST_PATH);
470
471         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
472         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
473
474         verifyBatchedModifications(batchedModifications.get(0), false,
475                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
476     }
477
478     @Test
479     public void testReadyWithReadWrite() {
480         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
481
482         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
483
484         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
485                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
486
487         expectBatchedModificationsReady(actorRef, true);
488
489         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
490
491         transactionProxy.read(TestModel.TEST_PATH);
492
493         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
494
495         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
496
497         assertTrue(ready instanceof SingleCommitCohortProxy);
498
499         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
500
501         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
502         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
503
504         verifyBatchedModifications(batchedModifications.get(0), true, true,
505                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
506
507         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
508     }
509
510     @Test
511     public void testReadyWithNoModifications() {
512         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
513
514         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
515                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
516
517         expectBatchedModificationsReady(actorRef, true);
518
519         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
520
521         transactionProxy.read(TestModel.TEST_PATH);
522
523         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
524
525         assertTrue(ready instanceof SingleCommitCohortProxy);
526
527         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
528
529         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
530         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
531
532         verifyBatchedModifications(batchedModifications.get(0), true, true);
533     }
534
535     @Test
536     public void testReadyWithMultipleShardWrites() {
537         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
538
539         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
540                 TestModel.JUNK_QNAME.getLocalName());
541
542         expectBatchedModificationsReady(actorRef1);
543         expectBatchedModificationsReady(actorRef2);
544
545         ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
546
547         doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
548                 .actorSelection(actorRef3.path().toString());
549
550         doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
551                 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
552
553         expectReadyLocalTransaction(actorRef3, false);
554
555         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
556
557         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
558         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
559         transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
560
561         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
562
563         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
564
565         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
566                 actorSelection(actorRef2), actorSelection(actorRef3));
567
568         SortedSet<String> expShardNames =
569                 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
570                         TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
571
572         ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
573         verify(mockActorContext).executeOperationAsync(
574                 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
575         assertTrue("Participating shards present", batchedMods.getValue().getParticipatingShardNames().isPresent());
576         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
577
578         batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
579         verify(mockActorContext).executeOperationAsync(
580                 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
581         assertTrue("Participating shards present", batchedMods.getValue().getParticipatingShardNames().isPresent());
582         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
583
584         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
585         verify(mockActorContext).executeOperationAsync(
586                 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
587         assertTrue("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
588         assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
589     }
590
591     @Test
592     public void testReadyWithWriteOnlyAndLastBatchPending() {
593         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
594
595         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
596
597         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
598
599         expectBatchedModificationsReady(actorRef, true);
600
601         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
602
603         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
604
605         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
606
607         assertTrue(ready instanceof SingleCommitCohortProxy);
608
609         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
610
611         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
612         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
613
614         verifyBatchedModifications(batchedModifications.get(0), true, true,
615                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
616     }
617
618     @Test
619     public void testReadyWithWriteOnlyAndLastBatchEmpty() {
620         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
621         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
622
623         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
624
625         expectBatchedModificationsReady(actorRef, true);
626
627         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
628
629         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
630
631         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
632
633         assertTrue(ready instanceof SingleCommitCohortProxy);
634
635         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
636
637         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
638         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
639
640         verifyBatchedModifications(batchedModifications.get(0), false,
641                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
642
643         verifyBatchedModifications(batchedModifications.get(1), true, true);
644     }
645
646     @Test
647     public void testReadyWithReplyFailure() {
648         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
649
650         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
651
652         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
653
654         expectFailedBatchedModifications(actorRef);
655
656         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
657
658         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
659
660         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
661
662         assertTrue(ready instanceof SingleCommitCohortProxy);
663
664         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
665     }
666
667     @Test
668     public void testReadyWithDebugContextEnabled() {
669         dataStoreContextBuilder.transactionDebugContextEnabled(true);
670
671         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
672
673         expectBatchedModificationsReady(actorRef, true);
674
675         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
676
677         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
678
679         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
680
681         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
682
683         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
684     }
685
686     @Test
687     public void testReadyWithLocalTransaction() {
688         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
689
690         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
691                 .actorSelection(shardActorRef.path().toString());
692
693         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
694                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
695
696         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
697
698         expectReadyLocalTransaction(shardActorRef, true);
699
700         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
701         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
702
703         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
704         assertTrue(ready instanceof SingleCommitCohortProxy);
705         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
706
707         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
708         verify(mockActorContext).executeOperationAsync(
709                 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
710         assertFalse("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
711     }
712
713     @Test
714     public void testReadyWithLocalTransactionWithFailure() {
715         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
716
717         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
718                 .actorSelection(shardActorRef.path().toString());
719
720         DataTree mockDataTree = createDataTree();
721         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
722         doThrow(new RuntimeException("mock")).when(mockModification).ready();
723
724         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
725                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
726
727         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
728
729         expectReadyLocalTransaction(shardActorRef, true);
730
731         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
732         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
733
734         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
735         assertTrue(ready instanceof SingleCommitCohortProxy);
736         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
737     }
738
739     private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
740         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
741
742         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
743
744         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
745
746         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
747
748         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
749
750         transactionProxy.delete(TestModel.TEST_PATH);
751
752         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
753
754         assertTrue(ready instanceof SingleCommitCohortProxy);
755
756         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
757     }
758
759     @Test
760     public void testWriteOnlyTxWithPrimaryNotFoundException() {
761         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
762     }
763
764     @Test
765     public void testWriteOnlyTxWithNotInitializedException() {
766         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
767     }
768
769     @Test
770     public void testWriteOnlyTxWithNoShardLeaderException() {
771         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
772     }
773
774     @Test
775     public void testReadyWithInvalidReplyMessageType() {
776         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
777         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
778
779         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
780                 TestModel.JUNK_QNAME.getLocalName());
781
782         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
783                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
784
785         expectBatchedModificationsReady(actorRef2);
786
787         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
788
789         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
790         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
791
792         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
793
794         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
795
796         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
797                 IllegalArgumentException.class);
798     }
799
800     @Test
801     public void testGetIdentifier() {
802         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
803         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
804
805         Object id = transactionProxy.getIdentifier();
806         assertNotNull("getIdentifier returned null", id);
807         assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
808     }
809
810     @Test
811     public void testClose() {
812         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
813
814         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
815                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
816
817         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
818
819         transactionProxy.read(TestModel.TEST_PATH);
820
821         transactionProxy.close();
822
823         verify(mockActorContext).sendOperationAsync(
824                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
825     }
826
827     private interface TransactionProxyOperation {
828         void run(TransactionProxy transactionProxy);
829     }
830
831     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
832         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
833     }
834
835     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
836         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
837                 dataTree);
838     }
839
840     private void throttleOperation(final TransactionProxyOperation operation) {
841         throttleOperation(operation, 1, true);
842     }
843
844     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
845             final boolean shardFound) {
846         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
847                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
848     }
849
850     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
851             final boolean shardFound, final long expectedCompletionTime) {
852         ActorSystem actorSystem = getSystem();
853         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
854
855         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
856         // we now allow one extra permit to be allowed for ready
857         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
858                 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
859                         .getDatastoreContext();
860
861         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
862                 .actorSelection(shardActorRef.path().toString());
863
864         if (shardFound) {
865             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
866                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
867             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
868                     .findPrimaryShardAsync(eq("cars"));
869
870         } else {
871             doReturn(Futures.failed(new Exception("not found")))
872                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
873         }
874
875         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
876                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
877                 any(Timeout.class));
878
879         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
880
881         long start = System.nanoTime();
882
883         operation.run(transactionProxy);
884
885         long end = System.nanoTime();
886
887         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
888                 expectedCompletionTime, end - start),
889                 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
890
891     }
892
893     private void completeOperation(final TransactionProxyOperation operation) {
894         completeOperation(operation, true);
895     }
896
897     private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
898         ActorSystem actorSystem = getSystem();
899         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
900
901         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
902                 .actorSelection(shardActorRef.path().toString());
903
904         if (shardFound) {
905             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
906                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
907         } else {
908             doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
909                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
910         }
911
912         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
913         String actorPath = txActorRef.path().toString();
914         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
915                 DataStoreVersions.CURRENT_VERSION);
916
917         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
918
919         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
920                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
921                 any(Timeout.class));
922
923         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
924
925         long start = System.nanoTime();
926
927         operation.run(transactionProxy);
928
929         long end = System.nanoTime();
930
931         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
932                 .getOperationTimeoutInMillis());
933         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
934                 expected, end - start), end - start <= expected);
935     }
936
937     private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
938         ActorSystem actorSystem = getSystem();
939         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
940
941         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
942                 .actorSelection(shardActorRef.path().toString());
943
944         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
945                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
946
947         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
948
949         long start = System.nanoTime();
950
951         operation.run(transactionProxy);
952
953         long end = System.nanoTime();
954
955         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
956                 .getOperationTimeoutInMillis());
957         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
958                 end - start <= expected);
959     }
960
961     private static DataTree createDataTree() {
962         DataTree dataTree = mock(DataTree.class);
963         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
964         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
965
966         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
967         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
968
969         return dataTree;
970     }
971
972     private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
973         DataTree dataTree = mock(DataTree.class);
974         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
975         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
976
977         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
978         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
979         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
980
981         return dataTree;
982     }
983
984
985     @Test
986     public void testWriteCompletionForLocalShard() {
987         completeOperationLocal(transactionProxy -> {
988             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
989
990             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
991
992             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
993
994         }, createDataTree());
995     }
996
997     @Test
998     public void testWriteThrottlingWhenShardFound() {
999         throttleOperation(transactionProxy -> {
1000             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1001
1002             expectIncompleteBatchedModifications();
1003
1004             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1005
1006             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1007         });
1008     }
1009
1010     @Test
1011     public void testWriteThrottlingWhenShardNotFound() {
1012         // Confirm that there is no throttling when the Shard is not found
1013         completeOperation(transactionProxy -> {
1014             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1015
1016             expectBatchedModifications(2);
1017
1018             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1019
1020             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1021         }, false);
1022
1023     }
1024
1025
1026     @Test
1027     public void testWriteCompletion() {
1028         completeOperation(transactionProxy -> {
1029             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1030
1031             expectBatchedModifications(2);
1032
1033             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1034
1035             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1036         });
1037     }
1038
1039     @Test
1040     public void testMergeThrottlingWhenShardFound() {
1041         throttleOperation(transactionProxy -> {
1042             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1043
1044             expectIncompleteBatchedModifications();
1045
1046             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1047
1048             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1049         });
1050     }
1051
1052     @Test
1053     public void testMergeThrottlingWhenShardNotFound() {
1054         completeOperation(transactionProxy -> {
1055             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1056
1057             expectBatchedModifications(2);
1058
1059             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1060
1061             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1062         }, false);
1063     }
1064
1065     @Test
1066     public void testMergeCompletion() {
1067         completeOperation(transactionProxy -> {
1068             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1069
1070             expectBatchedModifications(2);
1071
1072             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1073
1074             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1075         });
1076
1077     }
1078
1079     @Test
1080     public void testMergeCompletionForLocalShard() {
1081         completeOperationLocal(transactionProxy -> {
1082             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1083
1084             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1085
1086             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1087
1088         }, createDataTree());
1089     }
1090
1091
1092     @Test
1093     public void testDeleteThrottlingWhenShardFound() {
1094
1095         throttleOperation(transactionProxy -> {
1096             expectIncompleteBatchedModifications();
1097
1098             transactionProxy.delete(TestModel.TEST_PATH);
1099
1100             transactionProxy.delete(TestModel.TEST_PATH);
1101         });
1102     }
1103
1104
1105     @Test
1106     public void testDeleteThrottlingWhenShardNotFound() {
1107
1108         completeOperation(transactionProxy -> {
1109             expectBatchedModifications(2);
1110
1111             transactionProxy.delete(TestModel.TEST_PATH);
1112
1113             transactionProxy.delete(TestModel.TEST_PATH);
1114         }, false);
1115     }
1116
1117     @Test
1118     public void testDeleteCompletionForLocalShard() {
1119         completeOperationLocal(transactionProxy -> {
1120
1121             transactionProxy.delete(TestModel.TEST_PATH);
1122
1123             transactionProxy.delete(TestModel.TEST_PATH);
1124         }, createDataTree());
1125
1126     }
1127
1128     @Test
1129     public void testDeleteCompletion() {
1130         completeOperation(transactionProxy -> {
1131             expectBatchedModifications(2);
1132
1133             transactionProxy.delete(TestModel.TEST_PATH);
1134
1135             transactionProxy.delete(TestModel.TEST_PATH);
1136         });
1137
1138     }
1139
1140     @Test
1141     public void testReadThrottlingWhenShardFound() {
1142
1143         throttleOperation(transactionProxy -> {
1144             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1145                     any(ActorSelection.class), eqReadData());
1146
1147             transactionProxy.read(TestModel.TEST_PATH);
1148
1149             transactionProxy.read(TestModel.TEST_PATH);
1150         });
1151     }
1152
1153     @Test
1154     public void testReadThrottlingWhenShardNotFound() {
1155
1156         completeOperation(transactionProxy -> {
1157             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1158                     any(ActorSelection.class), eqReadData());
1159
1160             transactionProxy.read(TestModel.TEST_PATH);
1161
1162             transactionProxy.read(TestModel.TEST_PATH);
1163         }, false);
1164     }
1165
1166
1167     @Test
1168     public void testReadCompletion() {
1169         completeOperation(transactionProxy -> {
1170             NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1171
1172             doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1173                     any(ActorSelection.class), eqReadData(), any(Timeout.class));
1174
1175             transactionProxy.read(TestModel.TEST_PATH);
1176
1177             transactionProxy.read(TestModel.TEST_PATH);
1178         });
1179
1180     }
1181
1182     @Test
1183     public void testReadCompletionForLocalShard() {
1184         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1185         completeOperationLocal(transactionProxy -> {
1186             transactionProxy.read(TestModel.TEST_PATH);
1187
1188             transactionProxy.read(TestModel.TEST_PATH);
1189         }, createDataTree(nodeToRead));
1190
1191     }
1192
1193     @Test
1194     public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1195         completeOperationLocal(transactionProxy -> {
1196             transactionProxy.read(TestModel.TEST_PATH);
1197
1198             transactionProxy.read(TestModel.TEST_PATH);
1199         }, createDataTree());
1200
1201     }
1202
1203     @Test
1204     public void testExistsThrottlingWhenShardFound() {
1205
1206         throttleOperation(transactionProxy -> {
1207             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1208                     any(ActorSelection.class), eqDataExists());
1209
1210             transactionProxy.exists(TestModel.TEST_PATH);
1211
1212             transactionProxy.exists(TestModel.TEST_PATH);
1213         });
1214     }
1215
1216     @Test
1217     public void testExistsThrottlingWhenShardNotFound() {
1218
1219         completeOperation(transactionProxy -> {
1220             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1221                     any(ActorSelection.class), eqDataExists());
1222
1223             transactionProxy.exists(TestModel.TEST_PATH);
1224
1225             transactionProxy.exists(TestModel.TEST_PATH);
1226         }, false);
1227     }
1228
1229
1230     @Test
1231     public void testExistsCompletion() {
1232         completeOperation(transactionProxy -> {
1233             doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1234                     any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1235
1236             transactionProxy.exists(TestModel.TEST_PATH);
1237
1238             transactionProxy.exists(TestModel.TEST_PATH);
1239         });
1240
1241     }
1242
1243     @Test
1244     public void testExistsCompletionForLocalShard() {
1245         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1246         completeOperationLocal(transactionProxy -> {
1247             transactionProxy.exists(TestModel.TEST_PATH);
1248
1249             transactionProxy.exists(TestModel.TEST_PATH);
1250         }, createDataTree(nodeToRead));
1251
1252     }
1253
1254     @Test
1255     public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1256         completeOperationLocal(transactionProxy -> {
1257             transactionProxy.exists(TestModel.TEST_PATH);
1258
1259             transactionProxy.exists(TestModel.TEST_PATH);
1260         }, createDataTree());
1261
1262     }
1263
1264     @Test
1265     public void testReadyThrottling() {
1266
1267         throttleOperation(transactionProxy -> {
1268             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1269
1270             expectBatchedModifications(1);
1271
1272             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1273
1274             transactionProxy.ready();
1275         });
1276     }
1277
1278     @Test
1279     public void testReadyThrottlingWithTwoTransactionContexts() {
1280         throttleOperation(transactionProxy -> {
1281             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1282             NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1283
1284             expectBatchedModifications(2);
1285
1286             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1287
1288             // Trying to write to Cars will cause another transaction context to get created
1289             transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1290
1291             // Now ready should block for both transaction contexts
1292             transactionProxy.ready();
1293         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1294                 .getOperationTimeoutInMillis()) * 2);
1295     }
1296
1297     private void testModificationOperationBatching(final TransactionType type) {
1298         int shardBatchedModificationCount = 3;
1299         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1300
1301         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1302
1303         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1304
1305         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1306         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1307
1308         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1309         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1310
1311         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1312         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1313
1314         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1315         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1316
1317         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1318         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1319
1320         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1321         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1322
1323         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1324         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1325
1326         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1327
1328         transactionProxy.write(writePath1, writeNode1);
1329         transactionProxy.write(writePath2, writeNode2);
1330         transactionProxy.delete(deletePath1);
1331         transactionProxy.merge(mergePath1, mergeNode1);
1332         transactionProxy.merge(mergePath2, mergeNode2);
1333         transactionProxy.write(writePath3, writeNode3);
1334         transactionProxy.merge(mergePath3, mergeNode3);
1335         transactionProxy.delete(deletePath2);
1336
1337         // This sends the last batch.
1338         transactionProxy.ready();
1339
1340         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1341         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1342
1343         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1344                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1345
1346         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1347                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1348
1349         verifyBatchedModifications(batchedModifications.get(2), true, true,
1350                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1351
1352         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1353     }
1354
1355     @Test
1356     public void testReadWriteModificationOperationBatching() {
1357         testModificationOperationBatching(READ_WRITE);
1358     }
1359
1360     @Test
1361     public void testWriteOnlyModificationOperationBatching() {
1362         testModificationOperationBatching(WRITE_ONLY);
1363     }
1364
1365     @Test
1366     public void testOptimizedWriteOnlyModificationOperationBatching() {
1367         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1368         testModificationOperationBatching(WRITE_ONLY);
1369     }
1370
1371     @Test
1372     public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1373
1374         int shardBatchedModificationCount = 10;
1375         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1376
1377         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1378
1379         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1380
1381         final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1382         final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1383
1384         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1385         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1386
1387         final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1388         final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1389
1390         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1391         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1392
1393         final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1394
1395         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1396                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1397
1398         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1399                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1400
1401         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1402                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1403
1404         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1405
1406         transactionProxy.write(writePath1, writeNode1);
1407         transactionProxy.write(writePath2, writeNode2);
1408
1409         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1410
1411         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1412         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1413
1414         transactionProxy.merge(mergePath1, mergeNode1);
1415         transactionProxy.merge(mergePath2, mergeNode2);
1416
1417         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1418
1419         transactionProxy.delete(deletePath);
1420
1421         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
1422         assertEquals("Exists response", Boolean.TRUE, exists);
1423
1424         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1425         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1426
1427         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1428         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1429
1430         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1431                 new WriteModification(writePath2, writeNode2));
1432
1433         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1434                 new MergeModification(mergePath2, mergeNode2));
1435
1436         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1437
1438         InOrder inOrder = Mockito.inOrder(mockActorContext);
1439         inOrder.verify(mockActorContext).executeOperationAsync(
1440                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1441
1442         inOrder.verify(mockActorContext).executeOperationAsync(
1443                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1444
1445         inOrder.verify(mockActorContext).executeOperationAsync(
1446                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1447
1448         inOrder.verify(mockActorContext).executeOperationAsync(
1449                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1450
1451         inOrder.verify(mockActorContext).executeOperationAsync(
1452                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1453
1454         inOrder.verify(mockActorContext).executeOperationAsync(
1455                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1456     }
1457
1458     @Test
1459     public void testReadRoot() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1460         EffectiveModelContext schemaContext = SchemaContextHelper.full();
1461         Configuration configuration = mock(Configuration.class);
1462         doReturn(configuration).when(mockActorContext).getConfiguration();
1463         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1464         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1465
1466         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1467         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1468
1469         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1470         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1471
1472         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1473
1474         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1475
1476         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1477
1478         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1479                 YangInstanceIdentifier.empty()).get(5, TimeUnit.SECONDS);
1480
1481         assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1482
1483         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1484
1485         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1486
1487         @SuppressWarnings("unchecked")
1488         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1489
1490         for (NormalizedNode<?,?> node : collection) {
1491             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1492         }
1493
1494         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1495                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1496
1497         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1498
1499         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1500                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1501
1502         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1503     }
1504
1505
1506     private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1507         ActorSystem actorSystem = getSystem();
1508         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1509
1510         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1511                 .actorSelection(shardActorRef.path().toString());
1512
1513         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1514                 .findPrimaryShardAsync(eq(shardName));
1515
1516         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1517
1518         doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1519                 .actorSelection(txActorRef.path().toString());
1520
1521         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1522                 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1523                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1524
1525         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1526                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.empty()), any(Timeout.class));
1527     }
1528 }