Add unit test for ReadyLocalTransaction NPE
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.never;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.collect.Sets;
34 import com.google.common.util.concurrent.CheckedFuture;
35 import com.google.common.util.concurrent.FutureCallback;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import java.util.Collection;
38 import java.util.List;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.Assert;
44 import org.junit.Test;
45 import org.mockito.InOrder;
46 import org.mockito.Mockito;
47 import org.opendaylight.controller.cluster.datastore.config.Configuration;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
52 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
53 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
57 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
58 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
59 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
60 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
61 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
62 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
63 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
64 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
67 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
68 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
71 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
75 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
76 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
77 import scala.concurrent.Promise;
78
79 @SuppressWarnings("resource")
80 public class TransactionProxyTest extends AbstractTransactionProxyTest {
81
82     @SuppressWarnings("serial")
83     static class TestException extends RuntimeException {
84     }
85
86     static interface Invoker {
87         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
88     }
89
90     @Test
91     public void testRead() throws Exception {
92         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
93
94         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
95
96         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
97                 eq(actorSelection(actorRef)), eqSerializedReadData());
98
99         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
100                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
101
102         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
103
104         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
105
106         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
107                 eq(actorSelection(actorRef)), eqSerializedReadData());
108
109         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
110
111         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
112
113         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
114     }
115
116     @Test(expected = ReadFailedException.class)
117     public void testReadWithInvalidReplyMessageType() throws Exception {
118         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
119
120         doReturn(Futures.successful(new Object())).when(mockActorContext).
121                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
122
123         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
124
125         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
126     }
127
128     @Test(expected = TestException.class)
129     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
130         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
131
132         doReturn(Futures.failed(new TestException())).when(mockActorContext).
133                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
134
135         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
136
137         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
138     }
139
140     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
141             throws Throwable {
142         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
143
144         if (exToThrow instanceof PrimaryNotFoundException) {
145             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
146         } else {
147             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
148                     when(mockActorContext).findPrimaryShardAsync(anyString());
149         }
150
151         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
152                 any(ActorSelection.class), any(), any(Timeout.class));
153
154         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
155
156         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
157     }
158
159     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
160         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
161             @Override
162             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
163                 return proxy.read(TestModel.TEST_PATH);
164             }
165         });
166     }
167
168     @Test(expected = PrimaryNotFoundException.class)
169     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
170         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
171     }
172
173     @Test(expected = TimeoutException.class)
174     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
175         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
176                 new Exception("reason")));
177     }
178
179     @Test(expected = TestException.class)
180     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
181         testReadWithExceptionOnInitialCreateTransaction(new TestException());
182     }
183
184     @Test
185     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
186         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
187
188         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
189
190         expectBatchedModifications(actorRef, 1);
191
192         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
193                 eq(actorSelection(actorRef)), eqSerializedReadData());
194
195         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
196
197         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
198
199         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
200                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
201
202         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
203         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
204
205         InOrder inOrder = Mockito.inOrder(mockActorContext);
206         inOrder.verify(mockActorContext).executeOperationAsync(
207                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
208
209         inOrder.verify(mockActorContext).executeOperationAsync(
210                 eq(actorSelection(actorRef)), eqSerializedReadData());
211     }
212
213     @Test(expected=IllegalStateException.class)
214     public void testReadPreConditionCheck() {
215         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
216         transactionProxy.read(TestModel.TEST_PATH);
217     }
218
219     @Test(expected=IllegalArgumentException.class)
220     public void testInvalidCreateTransactionReply() throws Throwable {
221         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
222
223         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
224             actorSelection(actorRef.path().toString());
225
226         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
227             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
228
229         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
230             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
231             any(Timeout.class));
232
233         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
234
235         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
236     }
237
238     @Test
239     public void testExists() throws Exception {
240         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
241
242         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
243
244         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
245                 eq(actorSelection(actorRef)), eqSerializedDataExists());
246
247         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
248
249         assertEquals("Exists response", false, exists);
250
251         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
252                 eq(actorSelection(actorRef)), eqSerializedDataExists());
253
254         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
255
256         assertEquals("Exists response", true, exists);
257     }
258
259     @Test(expected = PrimaryNotFoundException.class)
260     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
261         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
262             @Override
263             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
264                 return proxy.exists(TestModel.TEST_PATH);
265             }
266         });
267     }
268
269     @Test(expected = ReadFailedException.class)
270     public void testExistsWithInvalidReplyMessageType() throws Exception {
271         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
272
273         doReturn(Futures.successful(new Object())).when(mockActorContext).
274                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
275
276         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
277
278         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
279     }
280
281     @Test(expected = TestException.class)
282     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
283         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
284
285         doReturn(Futures.failed(new TestException())).when(mockActorContext).
286                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
287
288         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
289
290         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
291     }
292
293     @Test
294     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
295         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
296
297         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
298
299         expectBatchedModifications(actorRef, 1);
300
301         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
302                 eq(actorSelection(actorRef)), eqSerializedDataExists());
303
304         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
305
306         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
307
308         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
309
310         assertEquals("Exists response", true, exists);
311
312         InOrder inOrder = Mockito.inOrder(mockActorContext);
313         inOrder.verify(mockActorContext).executeOperationAsync(
314                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
315
316         inOrder.verify(mockActorContext).executeOperationAsync(
317                 eq(actorSelection(actorRef)), eqSerializedDataExists());
318     }
319
320     @Test(expected=IllegalStateException.class)
321     public void testExistsPreConditionCheck() {
322         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
323         transactionProxy.exists(TestModel.TEST_PATH);
324     }
325
326     @Test
327     public void testWrite() throws Exception {
328         dataStoreContextBuilder.shardBatchedModificationCount(1);
329         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
330
331         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
332
333         expectBatchedModifications(actorRef, 1);
334
335         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
336
337         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
338
339         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
340     }
341
342     @Test
343     public void testWriteAfterAsyncRead() throws Throwable {
344         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
345
346         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
347         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
348                 eq(getSystem().actorSelection(actorRef.path())),
349                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
350
351         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
352                 eq(actorSelection(actorRef)), eqSerializedReadData());
353
354         expectBatchedModificationsReady(actorRef);
355
356         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
357
358         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
359
360         final CountDownLatch readComplete = new CountDownLatch(1);
361         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
362         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
363                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
364                     @Override
365                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
366                         try {
367                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
368                         } catch (Exception e) {
369                             caughtEx.set(e);
370                         } finally {
371                             readComplete.countDown();
372                         }
373                     }
374
375                     @Override
376                     public void onFailure(Throwable t) {
377                         caughtEx.set(t);
378                         readComplete.countDown();
379                     }
380                 });
381
382         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
383
384         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
385
386         if(caughtEx.get() != null) {
387             throw caughtEx.get();
388         }
389
390         // This sends the batched modification.
391         transactionProxy.ready();
392
393         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
394     }
395
396     @Test(expected=IllegalStateException.class)
397     public void testWritePreConditionCheck() {
398         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
399         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
400     }
401
402     @Test(expected=IllegalStateException.class)
403     public void testWriteAfterReadyPreConditionCheck() {
404         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
405
406         transactionProxy.ready();
407
408         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
409     }
410
411     @Test
412     public void testMerge() throws Exception {
413         dataStoreContextBuilder.shardBatchedModificationCount(1);
414         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
415
416         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
417
418         expectBatchedModifications(actorRef, 1);
419
420         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
421
422         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
423
424         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
425     }
426
427     @Test
428     public void testDelete() throws Exception {
429         dataStoreContextBuilder.shardBatchedModificationCount(1);
430         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
431
432         expectBatchedModifications(actorRef, 1);
433
434         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
435
436         transactionProxy.delete(TestModel.TEST_PATH);
437
438         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
439     }
440
441     @Test
442     public void testReadWrite() throws Exception {
443         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
444
445         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
446
447         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
448                 eq(actorSelection(actorRef)), eqSerializedReadData());
449
450         expectBatchedModifications(actorRef, 1);
451
452         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
453
454         transactionProxy.read(TestModel.TEST_PATH);
455
456         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
457
458         transactionProxy.read(TestModel.TEST_PATH);
459
460         transactionProxy.read(TestModel.TEST_PATH);
461
462         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
463         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
464
465         verifyBatchedModifications(batchedModifications.get(0), false,
466                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
467     }
468
469     @Test
470     public void testReadyWithReadWrite() throws Exception {
471         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
472
473         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
474
475         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
476                 eq(actorSelection(actorRef)), eqSerializedReadData());
477
478         expectBatchedModificationsReady(actorRef, true);
479
480         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
481
482         transactionProxy.read(TestModel.TEST_PATH);
483
484         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
485
486         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
487
488         assertTrue(ready instanceof SingleCommitCohortProxy);
489
490         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
491
492         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
493         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
494
495         verifyBatchedModifications(batchedModifications.get(0), true, true,
496                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
497
498         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
499     }
500
501     @Test
502     public void testReadyWithNoModifications() throws Exception {
503         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
504
505         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
506                 eq(actorSelection(actorRef)), eqSerializedReadData());
507
508         expectBatchedModificationsReady(actorRef, true);
509
510         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
511
512         transactionProxy.read(TestModel.TEST_PATH);
513
514         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
515
516         assertTrue(ready instanceof SingleCommitCohortProxy);
517
518         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
519
520         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
521         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
522
523         verifyBatchedModifications(batchedModifications.get(0), true, true);
524     }
525
526     @Test
527     public void testReadyWithMultipleShardWrites() throws Exception {
528         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
529
530         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
531
532         expectBatchedModificationsReady(actorRef1);
533         expectBatchedModificationsReady(actorRef2);
534
535         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
536
537         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
538         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
539
540         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
541
542         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
543
544         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
545                 actorSelection(actorRef2));
546     }
547
548     @Test
549     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
550         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
551
552         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
553
554         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
555
556         expectBatchedModificationsReady(actorRef, true);
557
558         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
559
560         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
561
562         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
563
564         assertTrue(ready instanceof SingleCommitCohortProxy);
565
566         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
567
568         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
569         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
570
571         verifyBatchedModifications(batchedModifications.get(0), true, true,
572                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
573
574         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
575                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
576     }
577
578     @Test
579     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
580         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
581         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
582
583         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
584
585         expectBatchedModificationsReady(actorRef, true);
586
587         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
588
589         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
590
591         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
592
593         assertTrue(ready instanceof SingleCommitCohortProxy);
594
595         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
596
597         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
598         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
599
600         verifyBatchedModifications(batchedModifications.get(0), false,
601                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
602
603         verifyBatchedModifications(batchedModifications.get(1), true, true);
604
605         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
606                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
607     }
608
609     @Test
610     public void testReadyWithReplyFailure() throws Exception {
611         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
612
613         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
614
615         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
616
617         expectFailedBatchedModifications(actorRef);
618
619         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
620
621         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
622
623         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
624
625         assertTrue(ready instanceof SingleCommitCohortProxy);
626
627         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
628     }
629
630     @Test
631     public void testReadyWithDebugContextEnabled() throws Exception {
632         dataStoreContextBuilder.transactionDebugContextEnabled(true);
633
634         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
635
636         expectBatchedModificationsReady(actorRef, true);
637
638         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
639
640         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
641
642         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
643
644         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
645
646         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
647     }
648
649     @Test
650     public void testReadyWithLocalTransaction() throws Exception {
651         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
652
653         doReturn(getSystem().actorSelection(shardActorRef.path())).
654                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
655
656         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).
657                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
658
659         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
660
661         expectReadyLocalTransaction(shardActorRef, true);
662
663         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
664         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
665
666         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
667         assertTrue(ready instanceof SingleCommitCohortProxy);
668         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
669     }
670
671     @Test
672     public void testReadyWithLocalTransactionWithFailure() throws Exception {
673         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
674
675         doReturn(getSystem().actorSelection(shardActorRef.path())).
676                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
677
678         Optional<DataTree> mockDataTree = createDataTree();
679         DataTreeModification mockModification = mockDataTree.get().takeSnapshot().newModification();
680         doThrow(new RuntimeException("mock")).when(mockModification).ready();
681
682         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).
683                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
684
685         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
686
687         expectReadyLocalTransaction(shardActorRef, true);
688
689         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
690         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
691
692         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
693         assertTrue(ready instanceof SingleCommitCohortProxy);
694         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
695     }
696
697     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
698         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
699
700         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
701
702         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
703
704         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
705
706         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
707
708         transactionProxy.delete(TestModel.TEST_PATH);
709
710         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
711
712         assertTrue(ready instanceof SingleCommitCohortProxy);
713
714         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
715     }
716
717     @Test
718     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
719         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
720     }
721
722     @Test
723     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
724         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
725     }
726
727     @Test
728     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
729         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
730     }
731
732     @Test
733     public void testReadyWithInvalidReplyMessageType() throws Exception {
734         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
735         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
736
737         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
738
739         doReturn(Futures.successful(new Object())).when(mockActorContext).
740                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
741
742         expectBatchedModificationsReady(actorRef2);
743
744         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
745
746         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
747         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
748
749         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
750
751         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
752
753         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
754                 IllegalArgumentException.class);
755     }
756
757     @Test
758     public void testGetIdentifier() {
759         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
760         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
761
762         Object id = transactionProxy.getIdentifier();
763         assertNotNull("getIdentifier returned null", id);
764         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
765     }
766
767     @Test
768     public void testClose() throws Exception{
769         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
770
771         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
772                 eq(actorSelection(actorRef)), eqSerializedReadData());
773
774         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
775
776         transactionProxy.read(TestModel.TEST_PATH);
777
778         transactionProxy.close();
779
780         verify(mockActorContext).sendOperationAsync(
781                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
782     }
783
784
785     /**
786      * Method to test a local Tx actor. The Tx paths are matched to decide if the
787      * Tx actor is local or not. This is done by mocking the Tx actor path
788      * and the caller paths and ensuring that the paths have the remote-address format
789      *
790      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
791      * the paths returned for the actors for all the tests are not qualified remote paths.
792      * Hence are treated as non-local/remote actors. In short, all tests except
793      * few below run for remote actors
794      *
795      * @throws Exception
796      */
797     @Test
798     public void testLocalTxActorRead() throws Exception {
799         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
800         doReturn(true).when(mockActorContext).isPathLocal(anyString());
801
802         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
803
804         // negative test case with null as the reply
805         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
806             any(ActorSelection.class), eqReadData());
807
808         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
809             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
810
811         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
812
813         // test case with node as read data reply
814         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
815
816         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
817             any(ActorSelection.class), eqReadData());
818
819         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
820
821         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
822
823         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
824
825         // test for local data exists
826         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
827             any(ActorSelection.class), eqDataExists());
828
829         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
830
831         assertEquals("Exists response", true, exists);
832     }
833
834     @Test
835     public void testLocalTxActorReady() throws Exception {
836         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
837         doReturn(true).when(mockActorContext).isPathLocal(anyString());
838
839         expectBatchedModificationsReady(actorRef, true);
840
841         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
842
843         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
844         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
845
846         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
847
848         assertTrue(ready instanceof SingleCommitCohortProxy);
849
850         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
851     }
852
853     private static interface TransactionProxyOperation {
854         void run(TransactionProxy transactionProxy);
855     }
856
857     private void throttleOperation(TransactionProxyOperation operation) {
858         throttleOperation(operation, 1, true);
859     }
860
861     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
862         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
863                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
864     }
865
866     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
867         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
868                 Optional.<DataTree>absent());
869     }
870
871     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
872         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
873                 dataTreeOptional);
874     }
875
876
877     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
878         ActorSystem actorSystem = getSystem();
879         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
880
881         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
882         // we now allow one extra permit to be allowed for ready
883         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
884                 shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
885
886         doReturn(actorSystem.actorSelection(shardActorRef.path())).
887                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
888
889         if(shardFound) {
890             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
891                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
892             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
893                     when(mockActorContext).findPrimaryShardAsync(eq("cars"));
894
895         } else {
896             doReturn(Futures.failed(new Exception("not found")))
897                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
898         }
899
900         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
901
902         doReturn(incompleteFuture()).when(mockActorContext).
903         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
904                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
905
906         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
907
908         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
909
910         long start = System.nanoTime();
911
912         operation.run(transactionProxy);
913
914         long end = System.nanoTime();
915
916         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
917                 expectedCompletionTime, (end-start)),
918                 ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
919
920     }
921
922     private void completeOperation(TransactionProxyOperation operation){
923         completeOperation(operation, true);
924     }
925
926     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
927         ActorSystem actorSystem = getSystem();
928         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
929
930         doReturn(actorSystem.actorSelection(shardActorRef.path())).
931                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
932
933         if(shardFound) {
934             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
935                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
936         } else {
937             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
938                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
939         }
940
941         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
942         String actorPath = txActorRef.path().toString();
943         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
944                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
945                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
946
947         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
948
949         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
950                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
951                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
952
953         doReturn(true).when(mockActorContext).isPathLocal(anyString());
954
955         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
956
957         long start = System.nanoTime();
958
959         operation.run(transactionProxy);
960
961         long end = System.nanoTime();
962
963         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
964         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
965                 expected, (end-start)), (end - start) <= expected);
966     }
967
968     private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
969         ActorSystem actorSystem = getSystem();
970         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
971
972         doReturn(actorSystem.actorSelection(shardActorRef.path())).
973                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
974
975         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
976                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
977
978         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
979
980         long start = System.nanoTime();
981
982         operation.run(transactionProxy);
983
984         long end = System.nanoTime();
985
986         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
987         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
988                 expected, (end-start)), (end - start) <= expected);
989     }
990
991     private static Optional<DataTree> createDataTree(){
992         DataTree dataTree = mock(DataTree.class);
993         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
994         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
995         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
996
997         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
998         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
999
1000         return dataTreeOptional;
1001     }
1002
1003     private static Optional<DataTree> createDataTree(NormalizedNode readResponse){
1004         DataTree dataTree = mock(DataTree.class);
1005         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
1006         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
1007         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
1008
1009         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
1010         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
1011         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
1012
1013         return dataTreeOptional;
1014     }
1015
1016
1017     @Test
1018     public void testWriteCompletionForLocalShard(){
1019         completeOperationLocal(new TransactionProxyOperation() {
1020             @Override
1021             public void run(TransactionProxy transactionProxy) {
1022                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1023
1024                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1025
1026                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1027
1028             }
1029         }, createDataTree());
1030     }
1031
1032     @Test
1033     public void testWriteThrottlingWhenShardFound(){
1034         throttleOperation(new TransactionProxyOperation() {
1035             @Override
1036             public void run(TransactionProxy transactionProxy) {
1037                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1038
1039                 expectIncompleteBatchedModifications();
1040
1041                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1042
1043                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1044             }
1045         });
1046     }
1047
1048     @Test
1049     public void testWriteThrottlingWhenShardNotFound(){
1050         // Confirm that there is no throttling when the Shard is not found
1051         completeOperation(new TransactionProxyOperation() {
1052             @Override
1053             public void run(TransactionProxy transactionProxy) {
1054                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1055
1056                 expectBatchedModifications(2);
1057
1058                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1059
1060                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1061             }
1062         }, false);
1063
1064     }
1065
1066
1067     @Test
1068     public void testWriteCompletion(){
1069         completeOperation(new TransactionProxyOperation() {
1070             @Override
1071             public void run(TransactionProxy transactionProxy) {
1072                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1073
1074                 expectBatchedModifications(2);
1075
1076                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1077
1078                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1079             }
1080         });
1081     }
1082
1083     @Test
1084     public void testMergeThrottlingWhenShardFound(){
1085         throttleOperation(new TransactionProxyOperation() {
1086             @Override
1087             public void run(TransactionProxy transactionProxy) {
1088                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1089
1090                 expectIncompleteBatchedModifications();
1091
1092                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1093
1094                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1095             }
1096         });
1097     }
1098
1099     @Test
1100     public void testMergeThrottlingWhenShardNotFound(){
1101         completeOperation(new TransactionProxyOperation() {
1102             @Override
1103             public void run(TransactionProxy transactionProxy) {
1104                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1105
1106                 expectBatchedModifications(2);
1107
1108                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1109
1110                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1111             }
1112         }, false);
1113     }
1114
1115     @Test
1116     public void testMergeCompletion(){
1117         completeOperation(new TransactionProxyOperation() {
1118             @Override
1119             public void run(TransactionProxy transactionProxy) {
1120                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1121
1122                 expectBatchedModifications(2);
1123
1124                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1125
1126                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1127             }
1128         });
1129
1130     }
1131
1132     @Test
1133     public void testMergeCompletionForLocalShard(){
1134         completeOperationLocal(new TransactionProxyOperation() {
1135             @Override
1136             public void run(TransactionProxy transactionProxy) {
1137                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1138
1139                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1140
1141                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1142
1143             }
1144         }, createDataTree());
1145     }
1146
1147
1148     @Test
1149     public void testDeleteThrottlingWhenShardFound(){
1150
1151         throttleOperation(new TransactionProxyOperation() {
1152             @Override
1153             public void run(TransactionProxy transactionProxy) {
1154                 expectIncompleteBatchedModifications();
1155
1156                 transactionProxy.delete(TestModel.TEST_PATH);
1157
1158                 transactionProxy.delete(TestModel.TEST_PATH);
1159             }
1160         });
1161     }
1162
1163
1164     @Test
1165     public void testDeleteThrottlingWhenShardNotFound(){
1166
1167         completeOperation(new TransactionProxyOperation() {
1168             @Override
1169             public void run(TransactionProxy transactionProxy) {
1170                 expectBatchedModifications(2);
1171
1172                 transactionProxy.delete(TestModel.TEST_PATH);
1173
1174                 transactionProxy.delete(TestModel.TEST_PATH);
1175             }
1176         }, false);
1177     }
1178
1179     @Test
1180     public void testDeleteCompletionForLocalShard(){
1181         completeOperationLocal(new TransactionProxyOperation() {
1182             @Override
1183             public void run(TransactionProxy transactionProxy) {
1184
1185                 transactionProxy.delete(TestModel.TEST_PATH);
1186
1187                 transactionProxy.delete(TestModel.TEST_PATH);
1188             }
1189         }, createDataTree());
1190
1191     }
1192
1193     @Test
1194     public void testDeleteCompletion(){
1195         completeOperation(new TransactionProxyOperation() {
1196             @Override
1197             public void run(TransactionProxy transactionProxy) {
1198                 expectBatchedModifications(2);
1199
1200                 transactionProxy.delete(TestModel.TEST_PATH);
1201
1202                 transactionProxy.delete(TestModel.TEST_PATH);
1203             }
1204         });
1205
1206     }
1207
1208     @Test
1209     public void testReadThrottlingWhenShardFound(){
1210
1211         throttleOperation(new TransactionProxyOperation() {
1212             @Override
1213             public void run(TransactionProxy transactionProxy) {
1214                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1215                         any(ActorSelection.class), eqReadData());
1216
1217                 transactionProxy.read(TestModel.TEST_PATH);
1218
1219                 transactionProxy.read(TestModel.TEST_PATH);
1220             }
1221         });
1222     }
1223
1224     @Test
1225     public void testReadThrottlingWhenShardNotFound(){
1226
1227         completeOperation(new TransactionProxyOperation() {
1228             @Override
1229             public void run(TransactionProxy transactionProxy) {
1230                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1231                         any(ActorSelection.class), eqReadData());
1232
1233                 transactionProxy.read(TestModel.TEST_PATH);
1234
1235                 transactionProxy.read(TestModel.TEST_PATH);
1236             }
1237         }, false);
1238     }
1239
1240
1241     @Test
1242     public void testReadCompletion(){
1243         completeOperation(new TransactionProxyOperation() {
1244             @Override
1245             public void run(TransactionProxy transactionProxy) {
1246                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1247
1248                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1249                         any(ActorSelection.class), eqReadData());
1250
1251                 transactionProxy.read(TestModel.TEST_PATH);
1252
1253                 transactionProxy.read(TestModel.TEST_PATH);
1254             }
1255         });
1256
1257     }
1258
1259     @Test
1260     public void testReadCompletionForLocalShard(){
1261         final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1262         completeOperationLocal(new TransactionProxyOperation() {
1263             @Override
1264             public void run(TransactionProxy transactionProxy) {
1265                 transactionProxy.read(TestModel.TEST_PATH);
1266
1267                 transactionProxy.read(TestModel.TEST_PATH);
1268             }
1269         }, createDataTree(nodeToRead));
1270
1271     }
1272
1273     @Test
1274     public void testReadCompletionForLocalShardWhenExceptionOccurs(){
1275         completeOperationLocal(new TransactionProxyOperation() {
1276             @Override
1277             public void run(TransactionProxy transactionProxy) {
1278                 transactionProxy.read(TestModel.TEST_PATH);
1279
1280                 transactionProxy.read(TestModel.TEST_PATH);
1281             }
1282         }, createDataTree());
1283
1284     }
1285
1286     @Test
1287     public void testExistsThrottlingWhenShardFound(){
1288
1289         throttleOperation(new TransactionProxyOperation() {
1290             @Override
1291             public void run(TransactionProxy transactionProxy) {
1292                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1293                         any(ActorSelection.class), eqDataExists());
1294
1295                 transactionProxy.exists(TestModel.TEST_PATH);
1296
1297                 transactionProxy.exists(TestModel.TEST_PATH);
1298             }
1299         });
1300     }
1301
1302     @Test
1303     public void testExistsThrottlingWhenShardNotFound(){
1304
1305         completeOperation(new TransactionProxyOperation() {
1306             @Override
1307             public void run(TransactionProxy transactionProxy) {
1308                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1309                         any(ActorSelection.class), eqDataExists());
1310
1311                 transactionProxy.exists(TestModel.TEST_PATH);
1312
1313                 transactionProxy.exists(TestModel.TEST_PATH);
1314             }
1315         }, false);
1316     }
1317
1318
1319     @Test
1320     public void testExistsCompletion(){
1321         completeOperation(new TransactionProxyOperation() {
1322             @Override
1323             public void run(TransactionProxy transactionProxy) {
1324                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1325                         any(ActorSelection.class), eqDataExists());
1326
1327                 transactionProxy.exists(TestModel.TEST_PATH);
1328
1329                 transactionProxy.exists(TestModel.TEST_PATH);
1330             }
1331         });
1332
1333     }
1334
1335     @Test
1336     public void testExistsCompletionForLocalShard(){
1337         final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1338         completeOperationLocal(new TransactionProxyOperation() {
1339             @Override
1340             public void run(TransactionProxy transactionProxy) {
1341                 transactionProxy.exists(TestModel.TEST_PATH);
1342
1343                 transactionProxy.exists(TestModel.TEST_PATH);
1344             }
1345         }, createDataTree(nodeToRead));
1346
1347     }
1348
1349     @Test
1350     public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
1351         completeOperationLocal(new TransactionProxyOperation() {
1352             @Override
1353             public void run(TransactionProxy transactionProxy) {
1354                 transactionProxy.exists(TestModel.TEST_PATH);
1355
1356                 transactionProxy.exists(TestModel.TEST_PATH);
1357             }
1358         }, createDataTree());
1359
1360     }
1361     @Test
1362     public void testReadyThrottling(){
1363
1364         throttleOperation(new TransactionProxyOperation() {
1365             @Override
1366             public void run(TransactionProxy transactionProxy) {
1367                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1368
1369                 expectBatchedModifications(1);
1370
1371                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1372
1373                 transactionProxy.ready();
1374             }
1375         });
1376     }
1377
1378     @Test
1379     public void testReadyThrottlingWithTwoTransactionContexts(){
1380         throttleOperation(new TransactionProxyOperation() {
1381             @Override
1382             public void run(TransactionProxy transactionProxy) {
1383                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1384                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1385
1386                 expectBatchedModifications(2);
1387
1388                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1389                         any(ActorSelection.class), any(ReadyTransaction.class));
1390
1391                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1392
1393                 // Trying to write to Cars will cause another transaction context to get created
1394                 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1395
1396                 // Now ready should block for both transaction contexts
1397                 transactionProxy.ready();
1398             }
1399         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
1400     }
1401
1402     private void testModificationOperationBatching(TransactionType type) throws Exception {
1403         int shardBatchedModificationCount = 3;
1404         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1405
1406         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1407
1408         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1409
1410         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1411         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1412
1413         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1414         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1415
1416         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1417         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1418
1419         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1420         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1421
1422         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1423         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1424
1425         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1426         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1427
1428         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1429         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1430
1431         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1432
1433         transactionProxy.write(writePath1, writeNode1);
1434         transactionProxy.write(writePath2, writeNode2);
1435         transactionProxy.delete(deletePath1);
1436         transactionProxy.merge(mergePath1, mergeNode1);
1437         transactionProxy.merge(mergePath2, mergeNode2);
1438         transactionProxy.write(writePath3, writeNode3);
1439         transactionProxy.merge(mergePath3, mergeNode3);
1440         transactionProxy.delete(deletePath2);
1441
1442         // This sends the last batch.
1443         transactionProxy.ready();
1444
1445         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1446         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1447
1448         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1449                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1450
1451         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1452                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1453
1454         verifyBatchedModifications(batchedModifications.get(2), true, true,
1455                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1456
1457         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1458     }
1459
1460     @Test
1461     public void testReadWriteModificationOperationBatching() throws Throwable {
1462         testModificationOperationBatching(READ_WRITE);
1463     }
1464
1465     @Test
1466     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1467         testModificationOperationBatching(WRITE_ONLY);
1468     }
1469
1470     @Test
1471     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1472         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1473         testModificationOperationBatching(WRITE_ONLY);
1474     }
1475
1476     @Test
1477     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1478
1479         int shardBatchedModificationCount = 10;
1480         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1481
1482         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1483
1484         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1485
1486         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1487         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1488
1489         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1490         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1491
1492         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1493         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1494
1495         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1496         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1497
1498         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1499
1500         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1501                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1502
1503         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1504                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1505
1506         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1507                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1508
1509         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1510
1511         transactionProxy.write(writePath1, writeNode1);
1512         transactionProxy.write(writePath2, writeNode2);
1513
1514         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1515                 get(5, TimeUnit.SECONDS);
1516
1517         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1518         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1519
1520         transactionProxy.merge(mergePath1, mergeNode1);
1521         transactionProxy.merge(mergePath2, mergeNode2);
1522
1523         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1524
1525         transactionProxy.delete(deletePath);
1526
1527         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1528         assertEquals("Exists response", true, exists);
1529
1530         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1531         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1532
1533         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1534         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1535
1536         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1537                 new WriteModification(writePath2, writeNode2));
1538
1539         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1540                 new MergeModification(mergePath2, mergeNode2));
1541
1542         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1543
1544         InOrder inOrder = Mockito.inOrder(mockActorContext);
1545         inOrder.verify(mockActorContext).executeOperationAsync(
1546                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1547
1548         inOrder.verify(mockActorContext).executeOperationAsync(
1549                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1550
1551         inOrder.verify(mockActorContext).executeOperationAsync(
1552                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1553
1554         inOrder.verify(mockActorContext).executeOperationAsync(
1555                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1556
1557         inOrder.verify(mockActorContext).executeOperationAsync(
1558                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1559
1560         inOrder.verify(mockActorContext).executeOperationAsync(
1561                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1562     }
1563
1564     @Test
1565     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1566
1567         SchemaContext schemaContext = SchemaContextHelper.full();
1568         Configuration configuration = mock(Configuration.class);
1569         doReturn(configuration).when(mockActorContext).getConfiguration();
1570         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1571         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1572
1573         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1574         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1575
1576         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1577         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1578
1579         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1580
1581         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1582
1583         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1584
1585         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1586                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1587
1588         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1589
1590         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1591
1592         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1593
1594         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1595
1596         for(NormalizedNode<?,?> node : collection){
1597             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1598         }
1599
1600         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1601                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1602
1603         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1604
1605         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1606                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1607
1608         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1609     }
1610
1611
1612     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1613         ActorSystem actorSystem = getSystem();
1614         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1615
1616         doReturn(getSystem().actorSelection(shardActorRef.path())).
1617                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1618
1619         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1620                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1621
1622         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1623
1624         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1625
1626         doReturn(actorSystem.actorSelection(txActorRef.path())).
1627                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1628
1629         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1630                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1631                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1632
1633         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1634                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1635     }
1636 }