01511982f9908671b2baf38dad8af0dfefb7fa3e
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.ActorSystem;
28 import akka.actor.Props;
29 import akka.dispatch.Futures;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Sets;
33 import com.google.common.util.concurrent.CheckedFuture;
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
50 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
51 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
54 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
56 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
57 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
58 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
59 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
60 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
61 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
62 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
63 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
64 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
65 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
73 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Promise;
76
77 @SuppressWarnings("resource")
78 public class TransactionProxyTest extends AbstractTransactionProxyTest {
79
80     @SuppressWarnings("serial")
81     static class TestException extends RuntimeException {
82     }
83
84     static interface Invoker {
85         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
86     }
87
88     @Test
89     public void testRead() throws Exception {
90         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
91
92         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
93
94         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
95                 eq(actorSelection(actorRef)), eqSerializedReadData());
96
97         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
98                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
99
100         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
101
102         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
103
104         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
105                 eq(actorSelection(actorRef)), eqSerializedReadData());
106
107         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108
109         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
110
111         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
112     }
113
114     @Test(expected = ReadFailedException.class)
115     public void testReadWithInvalidReplyMessageType() throws Exception {
116         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
117
118         doReturn(Futures.successful(new Object())).when(mockActorContext).
119                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
120
121         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
122
123         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
124     }
125
126     @Test(expected = TestException.class)
127     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
128         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
129
130         doReturn(Futures.failed(new TestException())).when(mockActorContext).
131                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
132
133         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
134
135         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
136     }
137
138     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
139             throws Throwable {
140         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
141
142         if (exToThrow instanceof PrimaryNotFoundException) {
143             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
144         } else {
145             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
146                     when(mockActorContext).findPrimaryShardAsync(anyString());
147         }
148
149         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
150                 any(ActorSelection.class), any(), any(Timeout.class));
151
152         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
153
154         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
155     }
156
157     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
158         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
159             @Override
160             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
161                 return proxy.read(TestModel.TEST_PATH);
162             }
163         });
164     }
165
166     @Test(expected = PrimaryNotFoundException.class)
167     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
168         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
169     }
170
171     @Test(expected = TimeoutException.class)
172     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
173         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
174                 new Exception("reason")));
175     }
176
177     @Test(expected = TestException.class)
178     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
179         testReadWithExceptionOnInitialCreateTransaction(new TestException());
180     }
181
182     @Test
183     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
184         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
185
186         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
187
188         expectBatchedModifications(actorRef, 1);
189
190         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
191                 eq(actorSelection(actorRef)), eqSerializedReadData());
192
193         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
194
195         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
196
197         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
198                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
199
200         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
201         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
202
203         InOrder inOrder = Mockito.inOrder(mockActorContext);
204         inOrder.verify(mockActorContext).executeOperationAsync(
205                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
206
207         inOrder.verify(mockActorContext).executeOperationAsync(
208                 eq(actorSelection(actorRef)), eqSerializedReadData());
209     }
210
211     @Test(expected=IllegalStateException.class)
212     public void testReadPreConditionCheck() {
213         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
214         transactionProxy.read(TestModel.TEST_PATH);
215     }
216
217     @Test(expected=IllegalArgumentException.class)
218     public void testInvalidCreateTransactionReply() throws Throwable {
219         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
220
221         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
222             actorSelection(actorRef.path().toString());
223
224         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
225             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
226
227         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
228             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
229             any(Timeout.class));
230
231         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
232
233         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
234     }
235
236     @Test
237     public void testExists() throws Exception {
238         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
239
240         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
241
242         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
243                 eq(actorSelection(actorRef)), eqSerializedDataExists());
244
245         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
246
247         assertEquals("Exists response", false, exists);
248
249         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
250                 eq(actorSelection(actorRef)), eqSerializedDataExists());
251
252         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
253
254         assertEquals("Exists response", true, exists);
255     }
256
257     @Test(expected = PrimaryNotFoundException.class)
258     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
259         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
260             @Override
261             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
262                 return proxy.exists(TestModel.TEST_PATH);
263             }
264         });
265     }
266
267     @Test(expected = ReadFailedException.class)
268     public void testExistsWithInvalidReplyMessageType() throws Exception {
269         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
270
271         doReturn(Futures.successful(new Object())).when(mockActorContext).
272                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
273
274         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
275
276         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
277     }
278
279     @Test(expected = TestException.class)
280     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
281         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
282
283         doReturn(Futures.failed(new TestException())).when(mockActorContext).
284                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
285
286         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
287
288         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
289     }
290
291     @Test
292     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
293         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
294
295         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296
297         expectBatchedModifications(actorRef, 1);
298
299         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
300                 eq(actorSelection(actorRef)), eqSerializedDataExists());
301
302         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
303
304         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
305
306         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
307
308         assertEquals("Exists response", true, exists);
309
310         InOrder inOrder = Mockito.inOrder(mockActorContext);
311         inOrder.verify(mockActorContext).executeOperationAsync(
312                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
313
314         inOrder.verify(mockActorContext).executeOperationAsync(
315                 eq(actorSelection(actorRef)), eqSerializedDataExists());
316     }
317
318     @Test(expected=IllegalStateException.class)
319     public void testExistsPreConditionCheck() {
320         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
321         transactionProxy.exists(TestModel.TEST_PATH);
322     }
323
324     @Test
325     public void testWrite() throws Exception {
326         dataStoreContextBuilder.shardBatchedModificationCount(1);
327         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
328
329         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
330
331         expectBatchedModifications(actorRef, 1);
332
333         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
334
335         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
336
337         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
338     }
339
340     @Test
341     public void testWriteAfterAsyncRead() throws Throwable {
342         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
343
344         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
345         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
346                 eq(getSystem().actorSelection(actorRef.path())),
347                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
348
349         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
350                 eq(actorSelection(actorRef)), eqSerializedReadData());
351
352         expectBatchedModificationsReady(actorRef);
353
354         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
355
356         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
357
358         final CountDownLatch readComplete = new CountDownLatch(1);
359         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
360         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
361                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
362                     @Override
363                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
364                         try {
365                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
366                         } catch (Exception e) {
367                             caughtEx.set(e);
368                         } finally {
369                             readComplete.countDown();
370                         }
371                     }
372
373                     @Override
374                     public void onFailure(Throwable t) {
375                         caughtEx.set(t);
376                         readComplete.countDown();
377                     }
378                 });
379
380         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
381
382         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
383
384         if(caughtEx.get() != null) {
385             throw caughtEx.get();
386         }
387
388         // This sends the batched modification.
389         transactionProxy.ready();
390
391         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
392     }
393
394     @Test(expected=IllegalStateException.class)
395     public void testWritePreConditionCheck() {
396         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
397         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
398     }
399
400     @Test(expected=IllegalStateException.class)
401     public void testWriteAfterReadyPreConditionCheck() {
402         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
403
404         transactionProxy.ready();
405
406         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
407     }
408
409     @Test
410     public void testMerge() throws Exception {
411         dataStoreContextBuilder.shardBatchedModificationCount(1);
412         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
413
414         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
415
416         expectBatchedModifications(actorRef, 1);
417
418         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
419
420         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
421
422         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
423     }
424
425     @Test
426     public void testDelete() throws Exception {
427         dataStoreContextBuilder.shardBatchedModificationCount(1);
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
429
430         expectBatchedModifications(actorRef, 1);
431
432         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
433
434         transactionProxy.delete(TestModel.TEST_PATH);
435
436         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
437     }
438
439     @Test
440     public void testReadWrite() throws Exception {
441         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
442
443         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
444
445         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
446                 eq(actorSelection(actorRef)), eqSerializedReadData());
447
448         expectBatchedModifications(actorRef, 1);
449
450         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
451
452         transactionProxy.read(TestModel.TEST_PATH);
453
454         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
455
456         transactionProxy.read(TestModel.TEST_PATH);
457
458         transactionProxy.read(TestModel.TEST_PATH);
459
460         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
461         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
462
463         verifyBatchedModifications(batchedModifications.get(0), false,
464                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
465     }
466
467     @Test
468     public void testReadyWithReadWrite() throws Exception {
469         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
470
471         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472
473         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
474                 eq(actorSelection(actorRef)), eqSerializedReadData());
475
476         expectBatchedModificationsReady(actorRef, true);
477
478         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
479
480         transactionProxy.read(TestModel.TEST_PATH);
481
482         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
483
484         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
485
486         assertTrue(ready instanceof SingleCommitCohortProxy);
487
488         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
489
490         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
491         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
492
493         verifyBatchedModifications(batchedModifications.get(0), true, true,
494                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
495
496         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
497     }
498
499     @Test
500     public void testReadyWithNoModifications() throws Exception {
501         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
502
503         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
504                 eq(actorSelection(actorRef)), eqSerializedReadData());
505
506         expectBatchedModificationsReady(actorRef, true);
507
508         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
509
510         transactionProxy.read(TestModel.TEST_PATH);
511
512         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
513
514         assertTrue(ready instanceof SingleCommitCohortProxy);
515
516         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
517
518         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
519         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
520
521         verifyBatchedModifications(batchedModifications.get(0), true, true);
522     }
523
524     @Test
525     public void testReadyWithMultipleShardWrites() throws Exception {
526         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
527
528         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
529
530         expectBatchedModificationsReady(actorRef1);
531         expectBatchedModificationsReady(actorRef2);
532
533         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
534
535         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
536         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
537
538         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
539
540         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
541
542         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
543                 actorSelection(actorRef2));
544     }
545
546     @Test
547     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
548         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
549
550         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
551
552         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
553
554         expectBatchedModificationsReady(actorRef, true);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559
560         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
561
562         assertTrue(ready instanceof SingleCommitCohortProxy);
563
564         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
565
566         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
567         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
568
569         verifyBatchedModifications(batchedModifications.get(0), true, true,
570                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
571
572         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
573                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
574     }
575
576     @Test
577     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
578         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
579         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
580
581         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
582
583         expectBatchedModificationsReady(actorRef, true);
584
585         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
586
587         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
588
589         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
590
591         assertTrue(ready instanceof SingleCommitCohortProxy);
592
593         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
594
595         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
596         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
597
598         verifyBatchedModifications(batchedModifications.get(0), false,
599                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
600
601         verifyBatchedModifications(batchedModifications.get(1), true, true);
602
603         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
604                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
605     }
606
607     @Test
608     public void testReadyWithReplyFailure() throws Exception {
609         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
610
611         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
612
613         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
614
615         expectFailedBatchedModifications(actorRef);
616
617         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
618
619         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
620
621         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
622
623         assertTrue(ready instanceof SingleCommitCohortProxy);
624
625         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
626     }
627
628     @Test
629     public void testReadyWithDebugContextEnabled() throws Exception {
630         dataStoreContextBuilder.transactionDebugContextEnabled(true);
631
632         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
633
634         expectBatchedModificationsReady(actorRef, true);
635
636         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
637
638         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
639
640         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
641
642         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
643
644         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
645     }
646
647     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
648         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
649
650         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
651
652         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
653
654         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
655
656         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
657
658         transactionProxy.delete(TestModel.TEST_PATH);
659
660         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
661
662         assertTrue(ready instanceof SingleCommitCohortProxy);
663
664         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
665     }
666
667     @Test
668     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
669         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
670     }
671
672     @Test
673     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
674         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
675     }
676
677     @Test
678     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
679         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
680     }
681
682     @Test
683     public void testReadyWithInvalidReplyMessageType() throws Exception {
684         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
685         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
686
687         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
688
689         doReturn(Futures.successful(new Object())).when(mockActorContext).
690                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
691
692         expectBatchedModificationsReady(actorRef2);
693
694         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
695
696         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
697         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
698
699         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
700
701         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
702
703         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
704                 IllegalArgumentException.class);
705     }
706
707     @Test
708     public void testGetIdentifier() {
709         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
710         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
711
712         Object id = transactionProxy.getIdentifier();
713         assertNotNull("getIdentifier returned null", id);
714         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
715     }
716
717     @Test
718     public void testClose() throws Exception{
719         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
720
721         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
722                 eq(actorSelection(actorRef)), eqSerializedReadData());
723
724         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
725
726         transactionProxy.read(TestModel.TEST_PATH);
727
728         transactionProxy.close();
729
730         verify(mockActorContext).sendOperationAsync(
731                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
732     }
733
734
735     /**
736      * Method to test a local Tx actor. The Tx paths are matched to decide if the
737      * Tx actor is local or not. This is done by mocking the Tx actor path
738      * and the caller paths and ensuring that the paths have the remote-address format
739      *
740      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
741      * the paths returned for the actors for all the tests are not qualified remote paths.
742      * Hence are treated as non-local/remote actors. In short, all tests except
743      * few below run for remote actors
744      *
745      * @throws Exception
746      */
747     @Test
748     public void testLocalTxActorRead() throws Exception {
749         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
750         doReturn(true).when(mockActorContext).isPathLocal(anyString());
751
752         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
753
754         // negative test case with null as the reply
755         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
756             any(ActorSelection.class), eqReadData());
757
758         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
759             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
760
761         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
762
763         // test case with node as read data reply
764         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
765
766         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
767             any(ActorSelection.class), eqReadData());
768
769         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
770
771         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
772
773         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
774
775         // test for local data exists
776         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
777             any(ActorSelection.class), eqDataExists());
778
779         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
780
781         assertEquals("Exists response", true, exists);
782     }
783
784     @Test
785     public void testLocalTxActorReady() throws Exception {
786         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
787         doReturn(true).when(mockActorContext).isPathLocal(anyString());
788
789         expectBatchedModificationsReady(actorRef, true);
790
791         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
792
793         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
794         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
795
796         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
797
798         assertTrue(ready instanceof SingleCommitCohortProxy);
799
800         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
801     }
802
803     private static interface TransactionProxyOperation {
804         void run(TransactionProxy transactionProxy);
805     }
806
807     private void throttleOperation(TransactionProxyOperation operation) {
808         throttleOperation(operation, 1, true);
809     }
810
811     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
812         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
813                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
814     }
815
816     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
817         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
818                 Optional.<DataTree>absent());
819     }
820
821     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
822         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
823                 dataTreeOptional);
824     }
825
826
827     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
828         ActorSystem actorSystem = getSystem();
829         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
830
831         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
832         // we now allow one extra permit to be allowed for ready
833         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
834                 shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
835
836         doReturn(actorSystem.actorSelection(shardActorRef.path())).
837                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
838
839         if(shardFound) {
840             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
841                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
842             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
843                     when(mockActorContext).findPrimaryShardAsync(eq("cars"));
844
845         } else {
846             doReturn(Futures.failed(new Exception("not found")))
847                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
848         }
849
850         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
851
852         doReturn(incompleteFuture()).when(mockActorContext).
853         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
854                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
855
856         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
857
858         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
859
860         long start = System.nanoTime();
861
862         operation.run(transactionProxy);
863
864         long end = System.nanoTime();
865
866         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
867                 expectedCompletionTime, (end-start)),
868                 ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
869
870     }
871
872     private void completeOperation(TransactionProxyOperation operation){
873         completeOperation(operation, true);
874     }
875
876     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
877         ActorSystem actorSystem = getSystem();
878         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
879
880         doReturn(actorSystem.actorSelection(shardActorRef.path())).
881                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
882
883         if(shardFound) {
884             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
885                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
886         } else {
887             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
888                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
889         }
890
891         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
892         String actorPath = txActorRef.path().toString();
893         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
894                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
895                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
896
897         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
898
899         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
900                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
901                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
902
903         doReturn(true).when(mockActorContext).isPathLocal(anyString());
904
905         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
906
907         long start = System.nanoTime();
908
909         operation.run(transactionProxy);
910
911         long end = System.nanoTime();
912
913         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
914         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
915                 expected, (end-start)), (end - start) <= expected);
916     }
917
918     private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
919         ActorSystem actorSystem = getSystem();
920         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
921
922         doReturn(actorSystem.actorSelection(shardActorRef.path())).
923                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
924
925         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
926                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
927
928         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
929
930         long start = System.nanoTime();
931
932         operation.run(transactionProxy);
933
934         long end = System.nanoTime();
935
936         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
937         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
938                 expected, (end-start)), (end - start) <= expected);
939     }
940
941     private Optional<DataTree> createDataTree(){
942         DataTree dataTree = mock(DataTree.class);
943         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
944         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
945         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
946
947         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
948         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
949
950         return dataTreeOptional;
951     }
952
953     private Optional<DataTree> createDataTree(NormalizedNode readResponse){
954         DataTree dataTree = mock(DataTree.class);
955         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
956         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
957         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
958
959         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
960         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
961         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
962
963         return dataTreeOptional;
964     }
965
966
967     @Test
968     public void testWriteCompletionForLocalShard(){
969         completeOperationLocal(new TransactionProxyOperation() {
970             @Override
971             public void run(TransactionProxy transactionProxy) {
972                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
973
974                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
975
976                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
977
978             }
979         }, createDataTree());
980     }
981
982     @Test
983     public void testWriteThrottlingWhenShardFound(){
984         throttleOperation(new TransactionProxyOperation() {
985             @Override
986             public void run(TransactionProxy transactionProxy) {
987                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
988
989                 expectIncompleteBatchedModifications();
990
991                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
992
993                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
994             }
995         });
996     }
997
998     @Test
999     public void testWriteThrottlingWhenShardNotFound(){
1000         // Confirm that there is no throttling when the Shard is not found
1001         completeOperation(new TransactionProxyOperation() {
1002             @Override
1003             public void run(TransactionProxy transactionProxy) {
1004                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1005
1006                 expectBatchedModifications(2);
1007
1008                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1009
1010                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1011             }
1012         }, false);
1013
1014     }
1015
1016
1017     @Test
1018     public void testWriteCompletion(){
1019         completeOperation(new TransactionProxyOperation() {
1020             @Override
1021             public void run(TransactionProxy transactionProxy) {
1022                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1023
1024                 expectBatchedModifications(2);
1025
1026                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1027
1028                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1029             }
1030         });
1031     }
1032
1033     @Test
1034     public void testMergeThrottlingWhenShardFound(){
1035         throttleOperation(new TransactionProxyOperation() {
1036             @Override
1037             public void run(TransactionProxy transactionProxy) {
1038                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1039
1040                 expectIncompleteBatchedModifications();
1041
1042                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1043
1044                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1045             }
1046         });
1047     }
1048
1049     @Test
1050     public void testMergeThrottlingWhenShardNotFound(){
1051         completeOperation(new TransactionProxyOperation() {
1052             @Override
1053             public void run(TransactionProxy transactionProxy) {
1054                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1055
1056                 expectBatchedModifications(2);
1057
1058                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1059
1060                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1061             }
1062         }, false);
1063     }
1064
1065     @Test
1066     public void testMergeCompletion(){
1067         completeOperation(new TransactionProxyOperation() {
1068             @Override
1069             public void run(TransactionProxy transactionProxy) {
1070                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1071
1072                 expectBatchedModifications(2);
1073
1074                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1075
1076                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1077             }
1078         });
1079
1080     }
1081
1082     @Test
1083     public void testMergeCompletionForLocalShard(){
1084         completeOperationLocal(new TransactionProxyOperation() {
1085             @Override
1086             public void run(TransactionProxy transactionProxy) {
1087                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1088
1089                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1090
1091                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1092
1093             }
1094         }, createDataTree());
1095     }
1096
1097
1098     @Test
1099     public void testDeleteThrottlingWhenShardFound(){
1100
1101         throttleOperation(new TransactionProxyOperation() {
1102             @Override
1103             public void run(TransactionProxy transactionProxy) {
1104                 expectIncompleteBatchedModifications();
1105
1106                 transactionProxy.delete(TestModel.TEST_PATH);
1107
1108                 transactionProxy.delete(TestModel.TEST_PATH);
1109             }
1110         });
1111     }
1112
1113
1114     @Test
1115     public void testDeleteThrottlingWhenShardNotFound(){
1116
1117         completeOperation(new TransactionProxyOperation() {
1118             @Override
1119             public void run(TransactionProxy transactionProxy) {
1120                 expectBatchedModifications(2);
1121
1122                 transactionProxy.delete(TestModel.TEST_PATH);
1123
1124                 transactionProxy.delete(TestModel.TEST_PATH);
1125             }
1126         }, false);
1127     }
1128
1129     @Test
1130     public void testDeleteCompletionForLocalShard(){
1131         completeOperationLocal(new TransactionProxyOperation() {
1132             @Override
1133             public void run(TransactionProxy transactionProxy) {
1134
1135                 transactionProxy.delete(TestModel.TEST_PATH);
1136
1137                 transactionProxy.delete(TestModel.TEST_PATH);
1138             }
1139         }, createDataTree());
1140
1141     }
1142
1143     @Test
1144     public void testDeleteCompletion(){
1145         completeOperation(new TransactionProxyOperation() {
1146             @Override
1147             public void run(TransactionProxy transactionProxy) {
1148                 expectBatchedModifications(2);
1149
1150                 transactionProxy.delete(TestModel.TEST_PATH);
1151
1152                 transactionProxy.delete(TestModel.TEST_PATH);
1153             }
1154         });
1155
1156     }
1157
1158     @Test
1159     public void testReadThrottlingWhenShardFound(){
1160
1161         throttleOperation(new TransactionProxyOperation() {
1162             @Override
1163             public void run(TransactionProxy transactionProxy) {
1164                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1165                         any(ActorSelection.class), eqReadData());
1166
1167                 transactionProxy.read(TestModel.TEST_PATH);
1168
1169                 transactionProxy.read(TestModel.TEST_PATH);
1170             }
1171         });
1172     }
1173
1174     @Test
1175     public void testReadThrottlingWhenShardNotFound(){
1176
1177         completeOperation(new TransactionProxyOperation() {
1178             @Override
1179             public void run(TransactionProxy transactionProxy) {
1180                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1181                         any(ActorSelection.class), eqReadData());
1182
1183                 transactionProxy.read(TestModel.TEST_PATH);
1184
1185                 transactionProxy.read(TestModel.TEST_PATH);
1186             }
1187         }, false);
1188     }
1189
1190
1191     @Test
1192     public void testReadCompletion(){
1193         completeOperation(new TransactionProxyOperation() {
1194             @Override
1195             public void run(TransactionProxy transactionProxy) {
1196                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1197
1198                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1199                         any(ActorSelection.class), eqReadData());
1200
1201                 transactionProxy.read(TestModel.TEST_PATH);
1202
1203                 transactionProxy.read(TestModel.TEST_PATH);
1204             }
1205         });
1206
1207     }
1208
1209     @Test
1210     public void testReadCompletionForLocalShard(){
1211         final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1212         completeOperationLocal(new TransactionProxyOperation() {
1213             @Override
1214             public void run(TransactionProxy transactionProxy) {
1215                 transactionProxy.read(TestModel.TEST_PATH);
1216
1217                 transactionProxy.read(TestModel.TEST_PATH);
1218             }
1219         }, createDataTree(nodeToRead));
1220
1221     }
1222
1223     @Test
1224     public void testReadCompletionForLocalShardWhenExceptionOccurs(){
1225         completeOperationLocal(new TransactionProxyOperation() {
1226             @Override
1227             public void run(TransactionProxy transactionProxy) {
1228                 transactionProxy.read(TestModel.TEST_PATH);
1229
1230                 transactionProxy.read(TestModel.TEST_PATH);
1231             }
1232         }, createDataTree());
1233
1234     }
1235
1236     @Test
1237     public void testExistsThrottlingWhenShardFound(){
1238
1239         throttleOperation(new TransactionProxyOperation() {
1240             @Override
1241             public void run(TransactionProxy transactionProxy) {
1242                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1243                         any(ActorSelection.class), eqDataExists());
1244
1245                 transactionProxy.exists(TestModel.TEST_PATH);
1246
1247                 transactionProxy.exists(TestModel.TEST_PATH);
1248             }
1249         });
1250     }
1251
1252     @Test
1253     public void testExistsThrottlingWhenShardNotFound(){
1254
1255         completeOperation(new TransactionProxyOperation() {
1256             @Override
1257             public void run(TransactionProxy transactionProxy) {
1258                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1259                         any(ActorSelection.class), eqDataExists());
1260
1261                 transactionProxy.exists(TestModel.TEST_PATH);
1262
1263                 transactionProxy.exists(TestModel.TEST_PATH);
1264             }
1265         }, false);
1266     }
1267
1268
1269     @Test
1270     public void testExistsCompletion(){
1271         completeOperation(new TransactionProxyOperation() {
1272             @Override
1273             public void run(TransactionProxy transactionProxy) {
1274                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1275                         any(ActorSelection.class), eqDataExists());
1276
1277                 transactionProxy.exists(TestModel.TEST_PATH);
1278
1279                 transactionProxy.exists(TestModel.TEST_PATH);
1280             }
1281         });
1282
1283     }
1284
1285     @Test
1286     public void testExistsCompletionForLocalShard(){
1287         final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1288         completeOperationLocal(new TransactionProxyOperation() {
1289             @Override
1290             public void run(TransactionProxy transactionProxy) {
1291                 transactionProxy.exists(TestModel.TEST_PATH);
1292
1293                 transactionProxy.exists(TestModel.TEST_PATH);
1294             }
1295         }, createDataTree(nodeToRead));
1296
1297     }
1298
1299     @Test
1300     public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
1301         completeOperationLocal(new TransactionProxyOperation() {
1302             @Override
1303             public void run(TransactionProxy transactionProxy) {
1304                 transactionProxy.exists(TestModel.TEST_PATH);
1305
1306                 transactionProxy.exists(TestModel.TEST_PATH);
1307             }
1308         }, createDataTree());
1309
1310     }
1311     @Test
1312     public void testReadyThrottling(){
1313
1314         throttleOperation(new TransactionProxyOperation() {
1315             @Override
1316             public void run(TransactionProxy transactionProxy) {
1317                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1318
1319                 expectBatchedModifications(1);
1320
1321                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1322
1323                 transactionProxy.ready();
1324             }
1325         });
1326     }
1327
1328     @Test
1329     public void testReadyThrottlingWithTwoTransactionContexts(){
1330         throttleOperation(new TransactionProxyOperation() {
1331             @Override
1332             public void run(TransactionProxy transactionProxy) {
1333                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1334                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1335
1336                 expectBatchedModifications(2);
1337
1338                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1339                         any(ActorSelection.class), any(ReadyTransaction.class));
1340
1341                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1342
1343                 // Trying to write to Cars will cause another transaction context to get created
1344                 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1345
1346                 // Now ready should block for both transaction contexts
1347                 transactionProxy.ready();
1348             }
1349         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
1350     }
1351
1352     private void testModificationOperationBatching(TransactionType type) throws Exception {
1353         int shardBatchedModificationCount = 3;
1354         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1355
1356         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1357
1358         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1359
1360         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1361         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1362
1363         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1364         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1365
1366         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1367         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1368
1369         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1370         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1371
1372         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1373         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1374
1375         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1376         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1377
1378         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1379         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1380
1381         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1382
1383         transactionProxy.write(writePath1, writeNode1);
1384         transactionProxy.write(writePath2, writeNode2);
1385         transactionProxy.delete(deletePath1);
1386         transactionProxy.merge(mergePath1, mergeNode1);
1387         transactionProxy.merge(mergePath2, mergeNode2);
1388         transactionProxy.write(writePath3, writeNode3);
1389         transactionProxy.merge(mergePath3, mergeNode3);
1390         transactionProxy.delete(deletePath2);
1391
1392         // This sends the last batch.
1393         transactionProxy.ready();
1394
1395         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1396         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1397
1398         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1399                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1400
1401         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1402                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1403
1404         verifyBatchedModifications(batchedModifications.get(2), true, true,
1405                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1406
1407         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1408     }
1409
1410     @Test
1411     public void testReadWriteModificationOperationBatching() throws Throwable {
1412         testModificationOperationBatching(READ_WRITE);
1413     }
1414
1415     @Test
1416     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1417         testModificationOperationBatching(WRITE_ONLY);
1418     }
1419
1420     @Test
1421     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1422         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1423         testModificationOperationBatching(WRITE_ONLY);
1424     }
1425
1426     @Test
1427     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1428
1429         int shardBatchedModificationCount = 10;
1430         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1431
1432         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1433
1434         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1435
1436         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1437         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1438
1439         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1440         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1441
1442         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1443         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1444
1445         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1446         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1447
1448         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1449
1450         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1451                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1452
1453         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1454                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1455
1456         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1457                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1458
1459         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1460
1461         transactionProxy.write(writePath1, writeNode1);
1462         transactionProxy.write(writePath2, writeNode2);
1463
1464         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1465                 get(5, TimeUnit.SECONDS);
1466
1467         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1468         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1469
1470         transactionProxy.merge(mergePath1, mergeNode1);
1471         transactionProxy.merge(mergePath2, mergeNode2);
1472
1473         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1474
1475         transactionProxy.delete(deletePath);
1476
1477         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1478         assertEquals("Exists response", true, exists);
1479
1480         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1481         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1482
1483         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1484         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1485
1486         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1487                 new WriteModification(writePath2, writeNode2));
1488
1489         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1490                 new MergeModification(mergePath2, mergeNode2));
1491
1492         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1493
1494         InOrder inOrder = Mockito.inOrder(mockActorContext);
1495         inOrder.verify(mockActorContext).executeOperationAsync(
1496                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1497
1498         inOrder.verify(mockActorContext).executeOperationAsync(
1499                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1500
1501         inOrder.verify(mockActorContext).executeOperationAsync(
1502                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1503
1504         inOrder.verify(mockActorContext).executeOperationAsync(
1505                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1506
1507         inOrder.verify(mockActorContext).executeOperationAsync(
1508                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1509
1510         inOrder.verify(mockActorContext).executeOperationAsync(
1511                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1512     }
1513
1514     @Test
1515     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1516
1517         SchemaContext schemaContext = SchemaContextHelper.full();
1518         Configuration configuration = mock(Configuration.class);
1519         doReturn(configuration).when(mockActorContext).getConfiguration();
1520         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1521         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1522
1523         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1524         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1525
1526         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1527         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1528
1529         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1530
1531         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1532
1533         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1534
1535         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1536                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1537
1538         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1539
1540         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1541
1542         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1543
1544         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1545
1546         for(NormalizedNode<?,?> node : collection){
1547             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1548         }
1549
1550         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1551                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1552
1553         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1554
1555         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1556                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1557
1558         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1559     }
1560
1561
1562     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1563         ActorSystem actorSystem = getSystem();
1564         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1565
1566         doReturn(getSystem().actorSelection(shardActorRef.path())).
1567                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1568
1569         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1570                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1571
1572         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1573
1574         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1575
1576         doReturn(actorSystem.actorSelection(txActorRef.path())).
1577                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1578
1579         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1580                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1581                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1582
1583         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1584                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1585     }
1586 }