Fix shard deadlock in 3 nodes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.ImmutableSortedSet;
35 import com.google.common.collect.Sets;
36 import com.google.common.util.concurrent.CheckedFuture;
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.MoreExecutors;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.util.Collection;
41 import java.util.List;
42 import java.util.SortedSet;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.datastore.config.Configuration;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
64 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
67 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
68 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
69 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
70 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
71 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
72 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
73 import org.opendaylight.mdsal.common.api.ReadFailedException;
74 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
75 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
76 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
77 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
81 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import scala.concurrent.Promise;
84
85 @SuppressWarnings("resource")
86 public class TransactionProxyTest extends AbstractTransactionProxyTest {
87
88     @SuppressWarnings("serial")
89     static class TestException extends RuntimeException {
90     }
91
92     interface Invoker {
93         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
94     }
95
96     @Test
97     public void testRead() throws Exception {
98         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
99
100         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
101
102         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
103                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
104
105         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
106                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
107
108         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
109
110         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
111
112         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
113                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
114
115         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
116
117         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
118
119         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
120     }
121
122     @Test(expected = ReadFailedException.class)
123     public void testReadWithInvalidReplyMessageType() throws Exception {
124         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
125
126         doReturn(Futures.successful(new Object())).when(mockActorContext)
127                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
128
129         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
130
131         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
132     }
133
134     @Test(expected = TestException.class)
135     public void testReadWithAsyncRemoteOperatonFailure() throws Exception {
136         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
137
138         doReturn(Futures.failed(new TestException())).when(mockActorContext)
139                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
140
141         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
142
143         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
144     }
145
146     private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
147             throws Exception {
148         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
149
150         if (exToThrow instanceof PrimaryNotFoundException) {
151             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
152         } else {
153             doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
154                     .findPrimaryShardAsync(anyString());
155         }
156
157         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
158                 any(ActorSelection.class), any(), any(Timeout.class));
159
160         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
161
162         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
163     }
164
165     private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Exception {
166         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
167     }
168
169     @Test(expected = PrimaryNotFoundException.class)
170     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
171         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
172     }
173
174     @Test(expected = TimeoutException.class)
175     public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
176         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
177                 new Exception("reason")));
178     }
179
180     @Test(expected = TestException.class)
181     public void testReadWhenAnyOtherExceptionIsThrown() throws Exception {
182         testReadWithExceptionOnInitialCreateTransaction(new TestException());
183     }
184
185     @Test
186     public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
187         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
188
189         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
190
191         expectBatchedModifications(actorRef, 1);
192
193         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
194                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
195
196         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
197
198         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
199
200         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
201                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
202
203         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
204         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
205
206         InOrder inOrder = Mockito.inOrder(mockActorContext);
207         inOrder.verify(mockActorContext).executeOperationAsync(
208                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
209
210         inOrder.verify(mockActorContext).executeOperationAsync(
211                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
212     }
213
214     @Test(expected = IllegalStateException.class)
215     public void testReadPreConditionCheck() {
216         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
217         transactionProxy.read(TestModel.TEST_PATH);
218     }
219
220     @Test(expected = IllegalArgumentException.class)
221     public void testInvalidCreateTransactionReply() throws Exception {
222         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
223
224         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
225                 .actorSelection(actorRef.path().toString());
226
227         doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
228                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
229
230         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
231             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
232             any(Timeout.class));
233
234         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
235
236         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
237     }
238
239     @Test
240     public void testExists() throws Exception {
241         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
242
243         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
244
245         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
246                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
247
248         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
249
250         assertEquals("Exists response", false, exists);
251
252         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
253                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
254
255         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
256
257         assertEquals("Exists response", true, exists);
258     }
259
260     @Test(expected = PrimaryNotFoundException.class)
261     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
262         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
263             proxy -> proxy.exists(TestModel.TEST_PATH));
264     }
265
266     @Test(expected = ReadFailedException.class)
267     public void testExistsWithInvalidReplyMessageType() throws Exception {
268         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
269
270         doReturn(Futures.successful(new Object())).when(mockActorContext)
271                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
272
273         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
274
275         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
276     }
277
278     @Test(expected = TestException.class)
279     public void testExistsWithAsyncRemoteOperatonFailure() throws Exception {
280         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
281
282         doReturn(Futures.failed(new TestException())).when(mockActorContext)
283                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
284
285         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
286
287         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
288     }
289
290     @Test
291     public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
292         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
293
294         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
295
296         expectBatchedModifications(actorRef, 1);
297
298         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
299                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
300
301         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
302
303         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
304
305         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
306
307         assertEquals("Exists response", true, exists);
308
309         InOrder inOrder = Mockito.inOrder(mockActorContext);
310         inOrder.verify(mockActorContext).executeOperationAsync(
311                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
312
313         inOrder.verify(mockActorContext).executeOperationAsync(
314                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
315     }
316
317     @Test(expected = IllegalStateException.class)
318     public void testExistsPreConditionCheck() {
319         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
320         transactionProxy.exists(TestModel.TEST_PATH);
321     }
322
323     @Test
324     public void testWrite() throws Exception {
325         dataStoreContextBuilder.shardBatchedModificationCount(1);
326         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
327
328         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
329
330         expectBatchedModifications(actorRef, 1);
331
332         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
333
334         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
335
336         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
337     }
338
339     @Test
340     @SuppressWarnings("checkstyle:IllegalCatch")
341     public void testWriteAfterAsyncRead() throws Exception {
342         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
343                 DefaultShardStrategy.DEFAULT_SHARD);
344
345         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
346         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
347                 eq(getSystem().actorSelection(actorRef.path())),
348                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
349
350         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
351                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
352
353         expectBatchedModificationsReady(actorRef);
354
355         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
356
357         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
358
359         final CountDownLatch readComplete = new CountDownLatch(1);
360         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
361         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
362                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
363                     @Override
364                     public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
365                         try {
366                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
367                         } catch (Exception e) {
368                             caughtEx.set(e);
369                         } finally {
370                             readComplete.countDown();
371                         }
372                     }
373
374                     @Override
375                     public void onFailure(final Throwable failure) {
376                         caughtEx.set(failure);
377                         readComplete.countDown();
378                     }
379                 }, MoreExecutors.directExecutor());
380
381         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
382
383         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
384
385         final Throwable t = caughtEx.get();
386         if (t != null) {
387             Throwables.propagateIfPossible(t, Exception.class);
388             throw new RuntimeException(t);
389         }
390
391         // This sends the batched modification.
392         transactionProxy.ready();
393
394         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
395     }
396
397     @Test(expected = IllegalStateException.class)
398     public void testWritePreConditionCheck() {
399         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
400         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
401     }
402
403     @Test(expected = IllegalStateException.class)
404     public void testWriteAfterReadyPreConditionCheck() {
405         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
406
407         transactionProxy.ready();
408
409         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
410     }
411
412     @Test
413     public void testMerge() throws Exception {
414         dataStoreContextBuilder.shardBatchedModificationCount(1);
415         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
416
417         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
418
419         expectBatchedModifications(actorRef, 1);
420
421         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
422
423         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
424
425         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
426     }
427
428     @Test
429     public void testDelete() throws Exception {
430         dataStoreContextBuilder.shardBatchedModificationCount(1);
431         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
432
433         expectBatchedModifications(actorRef, 1);
434
435         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
436
437         transactionProxy.delete(TestModel.TEST_PATH);
438
439         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
440     }
441
442     @Test
443     public void testReadWrite() throws Exception {
444         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
445
446         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
447
448         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
449                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
450
451         expectBatchedModifications(actorRef, 1);
452
453         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
454
455         transactionProxy.read(TestModel.TEST_PATH);
456
457         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
458
459         transactionProxy.read(TestModel.TEST_PATH);
460
461         transactionProxy.read(TestModel.TEST_PATH);
462
463         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
464         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
465
466         verifyBatchedModifications(batchedModifications.get(0), false,
467                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
468     }
469
470     @Test
471     public void testReadyWithReadWrite() throws Exception {
472         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
473
474         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
475
476         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
477                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
478
479         expectBatchedModificationsReady(actorRef, true);
480
481         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
482
483         transactionProxy.read(TestModel.TEST_PATH);
484
485         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
486
487         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
488
489         assertTrue(ready instanceof SingleCommitCohortProxy);
490
491         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
492
493         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
494         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
495
496         verifyBatchedModifications(batchedModifications.get(0), true, true,
497                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
498
499         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
500     }
501
502     @Test
503     public void testReadyWithNoModifications() throws Exception {
504         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
505
506         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
507                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
508
509         expectBatchedModificationsReady(actorRef, true);
510
511         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
512
513         transactionProxy.read(TestModel.TEST_PATH);
514
515         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
516
517         assertTrue(ready instanceof SingleCommitCohortProxy);
518
519         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
520
521         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
522         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
523
524         verifyBatchedModifications(batchedModifications.get(0), true, true);
525     }
526
527     @Test
528     public void testReadyWithMultipleShardWrites() throws Exception {
529         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
530
531         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
532                 TestModel.JUNK_QNAME.getLocalName());
533
534         expectBatchedModificationsReady(actorRef1);
535         expectBatchedModificationsReady(actorRef2);
536
537         ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
538
539         doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
540                 .actorSelection(actorRef3.path().toString());
541
542         doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
543                 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
544
545         expectReadyLocalTransaction(actorRef3, false);
546
547         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
548
549         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
550         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
551         transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
552
553         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
554
555         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
556
557         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
558                 actorSelection(actorRef2), actorSelection(actorRef3));
559
560         SortedSet<String> expShardNames =
561                 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
562                         TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
563
564         ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
565         verify(mockActorContext).executeOperationAsync(
566                 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
567         assertEquals("Participating shards present", true,
568                 batchedMods.getValue().getParticipatingShardNames().isPresent());
569         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
570
571         batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
572         verify(mockActorContext).executeOperationAsync(
573                 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
574         assertEquals("Participating shards present", true,
575                 batchedMods.getValue().getParticipatingShardNames().isPresent());
576         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
577
578         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
579         verify(mockActorContext).executeOperationAsync(
580                 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
581         assertEquals("Participating shards present", true,
582                 readyLocalTx.getValue().getParticipatingShardNames().isPresent());
583         assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
584     }
585
586     @Test
587     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
588         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
589
590         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
591
592         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
593
594         expectBatchedModificationsReady(actorRef, true);
595
596         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
597
598         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
599
600         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
601
602         assertTrue(ready instanceof SingleCommitCohortProxy);
603
604         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
605
606         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
607         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
608
609         verifyBatchedModifications(batchedModifications.get(0), true, true,
610                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
611     }
612
613     @Test
614     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
615         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
616         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
617
618         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
619
620         expectBatchedModificationsReady(actorRef, true);
621
622         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
623
624         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
625
626         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
627
628         assertTrue(ready instanceof SingleCommitCohortProxy);
629
630         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
631
632         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
633         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
634
635         verifyBatchedModifications(batchedModifications.get(0), false,
636                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
637
638         verifyBatchedModifications(batchedModifications.get(1), true, true);
639     }
640
641     @Test
642     public void testReadyWithReplyFailure() throws Exception {
643         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
644
645         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
646
647         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
648
649         expectFailedBatchedModifications(actorRef);
650
651         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
652
653         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
654
655         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
656
657         assertTrue(ready instanceof SingleCommitCohortProxy);
658
659         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
660     }
661
662     @Test
663     public void testReadyWithDebugContextEnabled() throws Exception {
664         dataStoreContextBuilder.transactionDebugContextEnabled(true);
665
666         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
667
668         expectBatchedModificationsReady(actorRef, true);
669
670         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
671
672         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
673
674         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
675
676         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
677
678         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
679     }
680
681     @Test
682     public void testReadyWithLocalTransaction() throws Exception {
683         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
684
685         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
686                 .actorSelection(shardActorRef.path().toString());
687
688         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
689                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
690
691         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
692
693         expectReadyLocalTransaction(shardActorRef, true);
694
695         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
696         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
697
698         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
699         assertTrue(ready instanceof SingleCommitCohortProxy);
700         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
701
702         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
703         verify(mockActorContext).executeOperationAsync(
704                 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
705         assertEquals("Participating shards present", false,
706                 readyLocalTx.getValue().getParticipatingShardNames().isPresent());
707     }
708
709     @Test
710     public void testReadyWithLocalTransactionWithFailure() throws Exception {
711         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
712
713         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
714                 .actorSelection(shardActorRef.path().toString());
715
716         DataTree mockDataTree = createDataTree();
717         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
718         doThrow(new RuntimeException("mock")).when(mockModification).ready();
719
720         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
721                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
722
723         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
724
725         expectReadyLocalTransaction(shardActorRef, true);
726
727         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
728         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
729
730         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
731         assertTrue(ready instanceof SingleCommitCohortProxy);
732         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
733     }
734
735     private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) throws Exception {
736         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
737
738         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
739
740         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
741
742         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
743
744         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
745
746         transactionProxy.delete(TestModel.TEST_PATH);
747
748         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
749
750         assertTrue(ready instanceof SingleCommitCohortProxy);
751
752         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
753     }
754
755     @Test
756     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
757         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
758     }
759
760     @Test
761     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
762         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
763     }
764
765     @Test
766     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
767         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
768     }
769
770     @Test
771     public void testReadyWithInvalidReplyMessageType() throws Exception {
772         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
773         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
774
775         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
776                 TestModel.JUNK_QNAME.getLocalName());
777
778         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
779                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
780
781         expectBatchedModificationsReady(actorRef2);
782
783         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
784
785         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
786         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
787
788         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
789
790         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
791
792         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
793                 IllegalArgumentException.class);
794     }
795
796     @Test
797     public void testGetIdentifier() {
798         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
799         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
800
801         Object id = transactionProxy.getIdentifier();
802         assertNotNull("getIdentifier returned null", id);
803         assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
804     }
805
806     @Test
807     public void testClose() throws Exception {
808         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
809
810         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
811                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
812
813         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
814
815         transactionProxy.read(TestModel.TEST_PATH);
816
817         transactionProxy.close();
818
819         verify(mockActorContext).sendOperationAsync(
820                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
821     }
822
823     private interface TransactionProxyOperation {
824         void run(TransactionProxy transactionProxy);
825     }
826
827     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
828         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
829     }
830
831     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
832         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
833                 dataTree);
834     }
835
836     private void throttleOperation(final TransactionProxyOperation operation) {
837         throttleOperation(operation, 1, true);
838     }
839
840     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
841             final boolean shardFound) {
842         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
843                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
844     }
845
846     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
847             final boolean shardFound, final long expectedCompletionTime) {
848         ActorSystem actorSystem = getSystem();
849         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
850
851         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
852         // we now allow one extra permit to be allowed for ready
853         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
854                 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
855                         .getDatastoreContext();
856
857         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
858                 .actorSelection(shardActorRef.path().toString());
859
860         if (shardFound) {
861             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
862                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
863             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
864                     .findPrimaryShardAsync(eq("cars"));
865
866         } else {
867             doReturn(Futures.failed(new Exception("not found")))
868                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
869         }
870
871         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
872                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
873                 any(Timeout.class));
874
875         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
876
877         long start = System.nanoTime();
878
879         operation.run(transactionProxy);
880
881         long end = System.nanoTime();
882
883         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
884                 expectedCompletionTime, end - start),
885                 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
886
887     }
888
889     private void completeOperation(final TransactionProxyOperation operation) {
890         completeOperation(operation, true);
891     }
892
893     private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
894         ActorSystem actorSystem = getSystem();
895         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
896
897         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
898                 .actorSelection(shardActorRef.path().toString());
899
900         if (shardFound) {
901             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
902                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
903         } else {
904             doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
905                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
906         }
907
908         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
909         String actorPath = txActorRef.path().toString();
910         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
911                 DataStoreVersions.CURRENT_VERSION);
912
913         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
914
915         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
916                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
917                 any(Timeout.class));
918
919         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
920
921         long start = System.nanoTime();
922
923         operation.run(transactionProxy);
924
925         long end = System.nanoTime();
926
927         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
928                 .getOperationTimeoutInMillis());
929         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
930                 expected, end - start), end - start <= expected);
931     }
932
933     private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
934         ActorSystem actorSystem = getSystem();
935         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
936
937         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
938                 .actorSelection(shardActorRef.path().toString());
939
940         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
941                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
942
943         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
944
945         long start = System.nanoTime();
946
947         operation.run(transactionProxy);
948
949         long end = System.nanoTime();
950
951         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
952                 .getOperationTimeoutInMillis());
953         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
954                 end - start <= expected);
955     }
956
957     private static DataTree createDataTree() {
958         DataTree dataTree = mock(DataTree.class);
959         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
960         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
961
962         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
963         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
964
965         return dataTree;
966     }
967
968     private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
969         DataTree dataTree = mock(DataTree.class);
970         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
971         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
972
973         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
974         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
975         doReturn(java.util.Optional.of(readResponse)).when(dataTreeModification).readNode(
976             any(YangInstanceIdentifier.class));
977
978         return dataTree;
979     }
980
981
982     @Test
983     public void testWriteCompletionForLocalShard() {
984         completeOperationLocal(transactionProxy -> {
985             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
986
987             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
988
989             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
990
991         }, createDataTree());
992     }
993
994     @Test
995     public void testWriteThrottlingWhenShardFound() {
996         throttleOperation(transactionProxy -> {
997             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
998
999             expectIncompleteBatchedModifications();
1000
1001             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1002
1003             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1004         });
1005     }
1006
1007     @Test
1008     public void testWriteThrottlingWhenShardNotFound() {
1009         // Confirm that there is no throttling when the Shard is not found
1010         completeOperation(transactionProxy -> {
1011             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1012
1013             expectBatchedModifications(2);
1014
1015             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1016
1017             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1018         }, false);
1019
1020     }
1021
1022
1023     @Test
1024     public void testWriteCompletion() {
1025         completeOperation(transactionProxy -> {
1026             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1027
1028             expectBatchedModifications(2);
1029
1030             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1031
1032             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1033         });
1034     }
1035
1036     @Test
1037     public void testMergeThrottlingWhenShardFound() {
1038         throttleOperation(transactionProxy -> {
1039             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1040
1041             expectIncompleteBatchedModifications();
1042
1043             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1044
1045             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1046         });
1047     }
1048
1049     @Test
1050     public void testMergeThrottlingWhenShardNotFound() {
1051         completeOperation(transactionProxy -> {
1052             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1053
1054             expectBatchedModifications(2);
1055
1056             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1057
1058             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1059         }, false);
1060     }
1061
1062     @Test
1063     public void testMergeCompletion() {
1064         completeOperation(transactionProxy -> {
1065             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1066
1067             expectBatchedModifications(2);
1068
1069             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1070
1071             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1072         });
1073
1074     }
1075
1076     @Test
1077     public void testMergeCompletionForLocalShard() {
1078         completeOperationLocal(transactionProxy -> {
1079             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1080
1081             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1082
1083             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1084
1085         }, createDataTree());
1086     }
1087
1088
1089     @Test
1090     public void testDeleteThrottlingWhenShardFound() {
1091
1092         throttleOperation(transactionProxy -> {
1093             expectIncompleteBatchedModifications();
1094
1095             transactionProxy.delete(TestModel.TEST_PATH);
1096
1097             transactionProxy.delete(TestModel.TEST_PATH);
1098         });
1099     }
1100
1101
1102     @Test
1103     public void testDeleteThrottlingWhenShardNotFound() {
1104
1105         completeOperation(transactionProxy -> {
1106             expectBatchedModifications(2);
1107
1108             transactionProxy.delete(TestModel.TEST_PATH);
1109
1110             transactionProxy.delete(TestModel.TEST_PATH);
1111         }, false);
1112     }
1113
1114     @Test
1115     public void testDeleteCompletionForLocalShard() {
1116         completeOperationLocal(transactionProxy -> {
1117
1118             transactionProxy.delete(TestModel.TEST_PATH);
1119
1120             transactionProxy.delete(TestModel.TEST_PATH);
1121         }, createDataTree());
1122
1123     }
1124
1125     @Test
1126     public void testDeleteCompletion() {
1127         completeOperation(transactionProxy -> {
1128             expectBatchedModifications(2);
1129
1130             transactionProxy.delete(TestModel.TEST_PATH);
1131
1132             transactionProxy.delete(TestModel.TEST_PATH);
1133         });
1134
1135     }
1136
1137     @Test
1138     public void testReadThrottlingWhenShardFound() {
1139
1140         throttleOperation(transactionProxy -> {
1141             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1142                     any(ActorSelection.class), eqReadData());
1143
1144             transactionProxy.read(TestModel.TEST_PATH);
1145
1146             transactionProxy.read(TestModel.TEST_PATH);
1147         });
1148     }
1149
1150     @Test
1151     public void testReadThrottlingWhenShardNotFound() {
1152
1153         completeOperation(transactionProxy -> {
1154             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1155                     any(ActorSelection.class), eqReadData());
1156
1157             transactionProxy.read(TestModel.TEST_PATH);
1158
1159             transactionProxy.read(TestModel.TEST_PATH);
1160         }, false);
1161     }
1162
1163
1164     @Test
1165     public void testReadCompletion() {
1166         completeOperation(transactionProxy -> {
1167             NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1168
1169             doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1170                     any(ActorSelection.class), eqReadData(), any(Timeout.class));
1171
1172             transactionProxy.read(TestModel.TEST_PATH);
1173
1174             transactionProxy.read(TestModel.TEST_PATH);
1175         });
1176
1177     }
1178
1179     @Test
1180     public void testReadCompletionForLocalShard() {
1181         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1182         completeOperationLocal(transactionProxy -> {
1183             transactionProxy.read(TestModel.TEST_PATH);
1184
1185             transactionProxy.read(TestModel.TEST_PATH);
1186         }, createDataTree(nodeToRead));
1187
1188     }
1189
1190     @Test
1191     public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1192         completeOperationLocal(transactionProxy -> {
1193             transactionProxy.read(TestModel.TEST_PATH);
1194
1195             transactionProxy.read(TestModel.TEST_PATH);
1196         }, createDataTree());
1197
1198     }
1199
1200     @Test
1201     public void testExistsThrottlingWhenShardFound() {
1202
1203         throttleOperation(transactionProxy -> {
1204             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1205                     any(ActorSelection.class), eqDataExists());
1206
1207             transactionProxy.exists(TestModel.TEST_PATH);
1208
1209             transactionProxy.exists(TestModel.TEST_PATH);
1210         });
1211     }
1212
1213     @Test
1214     public void testExistsThrottlingWhenShardNotFound() {
1215
1216         completeOperation(transactionProxy -> {
1217             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1218                     any(ActorSelection.class), eqDataExists());
1219
1220             transactionProxy.exists(TestModel.TEST_PATH);
1221
1222             transactionProxy.exists(TestModel.TEST_PATH);
1223         }, false);
1224     }
1225
1226
1227     @Test
1228     public void testExistsCompletion() {
1229         completeOperation(transactionProxy -> {
1230             doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1231                     any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1232
1233             transactionProxy.exists(TestModel.TEST_PATH);
1234
1235             transactionProxy.exists(TestModel.TEST_PATH);
1236         });
1237
1238     }
1239
1240     @Test
1241     public void testExistsCompletionForLocalShard() {
1242         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1243         completeOperationLocal(transactionProxy -> {
1244             transactionProxy.exists(TestModel.TEST_PATH);
1245
1246             transactionProxy.exists(TestModel.TEST_PATH);
1247         }, createDataTree(nodeToRead));
1248
1249     }
1250
1251     @Test
1252     public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1253         completeOperationLocal(transactionProxy -> {
1254             transactionProxy.exists(TestModel.TEST_PATH);
1255
1256             transactionProxy.exists(TestModel.TEST_PATH);
1257         }, createDataTree());
1258
1259     }
1260
1261     @Test
1262     public void testReadyThrottling() {
1263
1264         throttleOperation(transactionProxy -> {
1265             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1266
1267             expectBatchedModifications(1);
1268
1269             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1270
1271             transactionProxy.ready();
1272         });
1273     }
1274
1275     @Test
1276     public void testReadyThrottlingWithTwoTransactionContexts() {
1277         throttleOperation(transactionProxy -> {
1278             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1279             NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1280
1281             expectBatchedModifications(2);
1282
1283             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1284
1285             // Trying to write to Cars will cause another transaction context to get created
1286             transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1287
1288             // Now ready should block for both transaction contexts
1289             transactionProxy.ready();
1290         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1291                 .getOperationTimeoutInMillis()) * 2);
1292     }
1293
1294     private void testModificationOperationBatching(final TransactionType type) throws Exception {
1295         int shardBatchedModificationCount = 3;
1296         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1297
1298         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1299
1300         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1301
1302         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1303         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1304
1305         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1306         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1307
1308         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1309         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1310
1311         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1312         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1313
1314         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1315         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1316
1317         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1318         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1319
1320         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1321         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1322
1323         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1324
1325         transactionProxy.write(writePath1, writeNode1);
1326         transactionProxy.write(writePath2, writeNode2);
1327         transactionProxy.delete(deletePath1);
1328         transactionProxy.merge(mergePath1, mergeNode1);
1329         transactionProxy.merge(mergePath2, mergeNode2);
1330         transactionProxy.write(writePath3, writeNode3);
1331         transactionProxy.merge(mergePath3, mergeNode3);
1332         transactionProxy.delete(deletePath2);
1333
1334         // This sends the last batch.
1335         transactionProxy.ready();
1336
1337         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1338         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1339
1340         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1341                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1342
1343         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1344                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1345
1346         verifyBatchedModifications(batchedModifications.get(2), true, true,
1347                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1348
1349         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1350     }
1351
1352     @Test
1353     public void testReadWriteModificationOperationBatching() throws Exception {
1354         testModificationOperationBatching(READ_WRITE);
1355     }
1356
1357     @Test
1358     public void testWriteOnlyModificationOperationBatching() throws Exception {
1359         testModificationOperationBatching(WRITE_ONLY);
1360     }
1361
1362     @Test
1363     public void testOptimizedWriteOnlyModificationOperationBatching() throws Exception {
1364         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1365         testModificationOperationBatching(WRITE_ONLY);
1366     }
1367
1368     @Test
1369     public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1370
1371         int shardBatchedModificationCount = 10;
1372         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1373
1374         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1375
1376         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1377
1378         final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1379         final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1380
1381         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1382         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1383
1384         final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1385         final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1386
1387         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1388         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1389
1390         final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1391
1392         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1393                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1394
1395         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1396                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1397
1398         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1399                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1400
1401         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1402
1403         transactionProxy.write(writePath1, writeNode1);
1404         transactionProxy.write(writePath2, writeNode2);
1405
1406         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1407
1408         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1409         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1410
1411         transactionProxy.merge(mergePath1, mergeNode1);
1412         transactionProxy.merge(mergePath2, mergeNode2);
1413
1414         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1415
1416         transactionProxy.delete(deletePath);
1417
1418         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1419         assertEquals("Exists response", true, exists);
1420
1421         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1422         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1423
1424         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1425         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1426
1427         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1428                 new WriteModification(writePath2, writeNode2));
1429
1430         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1431                 new MergeModification(mergePath2, mergeNode2));
1432
1433         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1434
1435         InOrder inOrder = Mockito.inOrder(mockActorContext);
1436         inOrder.verify(mockActorContext).executeOperationAsync(
1437                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1438
1439         inOrder.verify(mockActorContext).executeOperationAsync(
1440                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1441
1442         inOrder.verify(mockActorContext).executeOperationAsync(
1443                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1444
1445         inOrder.verify(mockActorContext).executeOperationAsync(
1446                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1447
1448         inOrder.verify(mockActorContext).executeOperationAsync(
1449                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1450
1451         inOrder.verify(mockActorContext).executeOperationAsync(
1452                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1453     }
1454
1455     @Test
1456     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException,
1457             java.util.concurrent.TimeoutException {
1458         SchemaContext schemaContext = SchemaContextHelper.full();
1459         Configuration configuration = mock(Configuration.class);
1460         doReturn(configuration).when(mockActorContext).getConfiguration();
1461         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1462         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1463
1464         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1465         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1466
1467         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1468         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1469
1470         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1471
1472         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1473
1474         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1475
1476         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1477                 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1478
1479         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1480
1481         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1482
1483         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1484
1485         @SuppressWarnings("unchecked")
1486         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1487
1488         for (NormalizedNode<?,?> node : collection) {
1489             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1490         }
1491
1492         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1493                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1494
1495         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1496
1497         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1498                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1499
1500         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1501     }
1502
1503
1504     private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1505         ActorSystem actorSystem = getSystem();
1506         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1507
1508         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1509                 .actorSelection(shardActorRef.path().toString());
1510
1511         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1512                 .findPrimaryShardAsync(eq(shardName));
1513
1514         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1515
1516         doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1517                 .actorSelection(txActorRef.path().toString());
1518
1519         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1520                 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1521                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1522
1523         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1524                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));
1525     }
1526 }