abb3d27249735931715080ae5396ccd72cc14c6e
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.ActorSystem;
28 import akka.actor.Props;
29 import akka.dispatch.Futures;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Sets;
33 import com.google.common.util.concurrent.CheckedFuture;
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
51 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
52 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
57 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
58 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
59 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
60 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
61 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
62 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
63 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
64 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
65 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
73 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Promise;
76
77 @SuppressWarnings("resource")
78 public class TransactionProxyTest extends AbstractTransactionProxyTest {
79
80     @SuppressWarnings("serial")
81     static class TestException extends RuntimeException {
82     }
83
84     static interface Invoker {
85         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
86     }
87
88     @Test
89     public void testRead() throws Exception {
90         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
91
92         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
93
94         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
95                 eq(actorSelection(actorRef)), eqSerializedReadData());
96
97         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
98                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
99
100         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
101
102         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
103
104         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
105                 eq(actorSelection(actorRef)), eqSerializedReadData());
106
107         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108
109         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
110
111         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
112     }
113
114     @Test(expected = ReadFailedException.class)
115     public void testReadWithInvalidReplyMessageType() throws Exception {
116         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
117
118         doReturn(Futures.successful(new Object())).when(mockActorContext).
119                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
120
121         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
122
123         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
124     }
125
126     @Test(expected = TestException.class)
127     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
128         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
129
130         doReturn(Futures.failed(new TestException())).when(mockActorContext).
131                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
132
133         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
134
135         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
136     }
137
138     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
139             throws Throwable {
140         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
141
142         if (exToThrow instanceof PrimaryNotFoundException) {
143             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
144         } else {
145             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
146                     when(mockActorContext).findPrimaryShardAsync(anyString());
147         }
148
149         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
150                 any(ActorSelection.class), any(), any(Timeout.class));
151
152         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
153
154         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
155     }
156
157     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
158         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
159             @Override
160             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
161                 return proxy.read(TestModel.TEST_PATH);
162             }
163         });
164     }
165
166     @Test(expected = PrimaryNotFoundException.class)
167     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
168         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
169     }
170
171     @Test(expected = TimeoutException.class)
172     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
173         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
174                 new Exception("reason")));
175     }
176
177     @Test(expected = TestException.class)
178     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
179         testReadWithExceptionOnInitialCreateTransaction(new TestException());
180     }
181
182     @Test
183     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
184         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
185
186         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
187
188         expectBatchedModifications(actorRef, 1);
189
190         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
191                 eq(actorSelection(actorRef)), eqSerializedReadData());
192
193         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
194
195         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
196
197         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
198                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
199
200         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
201         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
202
203         InOrder inOrder = Mockito.inOrder(mockActorContext);
204         inOrder.verify(mockActorContext).executeOperationAsync(
205                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
206
207         inOrder.verify(mockActorContext).executeOperationAsync(
208                 eq(actorSelection(actorRef)), eqSerializedReadData());
209     }
210
211     @Test(expected=IllegalStateException.class)
212     public void testReadPreConditionCheck() {
213         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
214         transactionProxy.read(TestModel.TEST_PATH);
215     }
216
217     @Test(expected=IllegalArgumentException.class)
218     public void testInvalidCreateTransactionReply() throws Throwable {
219         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
220
221         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
222             actorSelection(actorRef.path().toString());
223
224         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
225             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
226
227         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
228             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
229             any(Timeout.class));
230
231         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
232
233         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
234     }
235
236     @Test
237     public void testExists() throws Exception {
238         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
239
240         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
241
242         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
243                 eq(actorSelection(actorRef)), eqSerializedDataExists());
244
245         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
246
247         assertEquals("Exists response", false, exists);
248
249         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
250                 eq(actorSelection(actorRef)), eqSerializedDataExists());
251
252         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
253
254         assertEquals("Exists response", true, exists);
255     }
256
257     @Test(expected = PrimaryNotFoundException.class)
258     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
259         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
260             @Override
261             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
262                 return proxy.exists(TestModel.TEST_PATH);
263             }
264         });
265     }
266
267     @Test(expected = ReadFailedException.class)
268     public void testExistsWithInvalidReplyMessageType() throws Exception {
269         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
270
271         doReturn(Futures.successful(new Object())).when(mockActorContext).
272                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
273
274         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
275
276         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
277     }
278
279     @Test(expected = TestException.class)
280     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
281         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
282
283         doReturn(Futures.failed(new TestException())).when(mockActorContext).
284                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
285
286         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
287
288         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
289     }
290
291     @Test
292     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
293         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
294
295         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296
297         expectBatchedModifications(actorRef, 1);
298
299         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
300                 eq(actorSelection(actorRef)), eqSerializedDataExists());
301
302         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
303
304         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
305
306         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
307
308         assertEquals("Exists response", true, exists);
309
310         InOrder inOrder = Mockito.inOrder(mockActorContext);
311         inOrder.verify(mockActorContext).executeOperationAsync(
312                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
313
314         inOrder.verify(mockActorContext).executeOperationAsync(
315                 eq(actorSelection(actorRef)), eqSerializedDataExists());
316     }
317
318     @Test(expected=IllegalStateException.class)
319     public void testExistsPreConditionCheck() {
320         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
321         transactionProxy.exists(TestModel.TEST_PATH);
322     }
323
324     @Test
325     public void testWrite() throws Exception {
326         dataStoreContextBuilder.shardBatchedModificationCount(1);
327         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
328
329         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
330
331         expectBatchedModifications(actorRef, 1);
332
333         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
334
335         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
336
337         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
338     }
339
340     @Test
341     public void testWriteAfterAsyncRead() throws Throwable {
342         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
343
344         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
345         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
346                 eq(getSystem().actorSelection(actorRef.path())),
347                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
348
349         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
350                 eq(actorSelection(actorRef)), eqSerializedReadData());
351
352         expectBatchedModificationsReady(actorRef);
353
354         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
355
356         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
357
358         final CountDownLatch readComplete = new CountDownLatch(1);
359         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
360         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
361                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
362                     @Override
363                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
364                         try {
365                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
366                         } catch (Exception e) {
367                             caughtEx.set(e);
368                         } finally {
369                             readComplete.countDown();
370                         }
371                     }
372
373                     @Override
374                     public void onFailure(Throwable t) {
375                         caughtEx.set(t);
376                         readComplete.countDown();
377                     }
378                 });
379
380         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
381
382         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
383
384         if(caughtEx.get() != null) {
385             throw caughtEx.get();
386         }
387
388         // This sends the batched modification.
389         transactionProxy.ready();
390
391         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
392     }
393
394     @Test(expected=IllegalStateException.class)
395     public void testWritePreConditionCheck() {
396         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
397         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
398     }
399
400     @Test(expected=IllegalStateException.class)
401     public void testWriteAfterReadyPreConditionCheck() {
402         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
403
404         transactionProxy.ready();
405
406         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
407     }
408
409     @Test
410     public void testMerge() throws Exception {
411         dataStoreContextBuilder.shardBatchedModificationCount(1);
412         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
413
414         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
415
416         expectBatchedModifications(actorRef, 1);
417
418         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
419
420         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
421
422         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
423     }
424
425     @Test
426     public void testDelete() throws Exception {
427         dataStoreContextBuilder.shardBatchedModificationCount(1);
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
429
430         expectBatchedModifications(actorRef, 1);
431
432         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
433
434         transactionProxy.delete(TestModel.TEST_PATH);
435
436         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
437     }
438
439     @Test
440     public void testReadWrite() throws Exception {
441         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
442
443         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
444
445         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
446                 eq(actorSelection(actorRef)), eqSerializedReadData());
447
448         expectBatchedModifications(actorRef, 1);
449
450         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
451
452         transactionProxy.read(TestModel.TEST_PATH);
453
454         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
455
456         transactionProxy.read(TestModel.TEST_PATH);
457
458         transactionProxy.read(TestModel.TEST_PATH);
459
460         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
461         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
462
463         verifyBatchedModifications(batchedModifications.get(0), false,
464                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
465     }
466
467     @Test
468     public void testReadyWithReadWrite() throws Exception {
469         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
470
471         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472
473         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
474                 eq(actorSelection(actorRef)), eqSerializedReadData());
475
476         expectBatchedModificationsReady(actorRef, true);
477
478         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
479
480         transactionProxy.read(TestModel.TEST_PATH);
481
482         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
483
484         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
485
486         assertTrue(ready instanceof SingleCommitCohortProxy);
487
488         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
489
490         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
491         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
492
493         verifyBatchedModifications(batchedModifications.get(0), true, true,
494                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
495
496         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
497     }
498
499     @Test
500     public void testReadyWithNoModifications() throws Exception {
501         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
502
503         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
504                 eq(actorSelection(actorRef)), eqSerializedReadData());
505
506         expectBatchedModificationsReady(actorRef, true);
507
508         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
509
510         transactionProxy.read(TestModel.TEST_PATH);
511
512         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
513
514         assertTrue(ready instanceof SingleCommitCohortProxy);
515
516         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
517
518         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
519         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
520
521         verifyBatchedModifications(batchedModifications.get(0), true, true);
522     }
523
524     @Test
525     public void testReadyWithMultipleShardWrites() throws Exception {
526         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
527
528         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
529
530         expectBatchedModificationsReady(actorRef1);
531         expectBatchedModificationsReady(actorRef2);
532
533         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
534
535         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
536         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
537
538         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
539
540         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
541
542         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
543                 actorSelection(actorRef2));
544     }
545
546     @Test
547     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
548         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
549
550         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
551
552         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
553
554         expectBatchedModificationsReady(actorRef, true);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559
560         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
561
562         assertTrue(ready instanceof SingleCommitCohortProxy);
563
564         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
565
566         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
567         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
568
569         verifyBatchedModifications(batchedModifications.get(0), true, true,
570                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
571     }
572
573     @Test
574     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
575         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
576         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
577
578         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
579
580         expectBatchedModificationsReady(actorRef, true);
581
582         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
583
584         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
585
586         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
587
588         assertTrue(ready instanceof SingleCommitCohortProxy);
589
590         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
591
592         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
593         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
594
595         verifyBatchedModifications(batchedModifications.get(0), false,
596                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
597
598         verifyBatchedModifications(batchedModifications.get(1), true, true);
599     }
600
601     @Test
602     public void testReadyWithReplyFailure() throws Exception {
603         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
604
605         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
606
607         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
608
609         expectFailedBatchedModifications(actorRef);
610
611         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
612
613         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
614
615         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
616
617         assertTrue(ready instanceof SingleCommitCohortProxy);
618
619         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
620     }
621
622     @Test
623     public void testReadyWithDebugContextEnabled() throws Exception {
624         dataStoreContextBuilder.transactionDebugContextEnabled(true);
625
626         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
627
628         expectBatchedModificationsReady(actorRef, true);
629
630         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
631
632         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
633
634         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
635
636         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
637
638         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
639     }
640
641     @Test
642     public void testReadyWithLocalTransaction() throws Exception {
643         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
644
645         doReturn(getSystem().actorSelection(shardActorRef.path())).
646                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
647
648         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).
649                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
650
651         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
652
653         expectReadyLocalTransaction(shardActorRef, true);
654
655         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
656         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
657
658         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
659         assertTrue(ready instanceof SingleCommitCohortProxy);
660         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
661     }
662
663     @Test
664     public void testReadyWithLocalTransactionWithFailure() throws Exception {
665         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
666
667         doReturn(getSystem().actorSelection(shardActorRef.path())).
668                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
669
670         Optional<DataTree> mockDataTree = createDataTree();
671         DataTreeModification mockModification = mockDataTree.get().takeSnapshot().newModification();
672         doThrow(new RuntimeException("mock")).when(mockModification).ready();
673
674         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).
675                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
676
677         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
678
679         expectReadyLocalTransaction(shardActorRef, true);
680
681         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
682         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
683
684         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
685         assertTrue(ready instanceof SingleCommitCohortProxy);
686         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
687     }
688
689     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
690         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
691
692         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
693
694         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
695
696         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
697
698         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
699
700         transactionProxy.delete(TestModel.TEST_PATH);
701
702         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
703
704         assertTrue(ready instanceof SingleCommitCohortProxy);
705
706         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
707     }
708
709     @Test
710     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
711         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
712     }
713
714     @Test
715     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
716         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
717     }
718
719     @Test
720     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
721         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
722     }
723
724     @Test
725     public void testReadyWithInvalidReplyMessageType() throws Exception {
726         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
727         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
728
729         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
730
731         doReturn(Futures.successful(new Object())).when(mockActorContext).
732                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
733
734         expectBatchedModificationsReady(actorRef2);
735
736         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
737
738         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
739         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
740
741         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
742
743         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
744
745         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
746                 IllegalArgumentException.class);
747     }
748
749     @Test
750     public void testGetIdentifier() {
751         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
752         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
753
754         Object id = transactionProxy.getIdentifier();
755         assertNotNull("getIdentifier returned null", id);
756         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
757     }
758
759     @Test
760     public void testClose() throws Exception{
761         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
762
763         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
764                 eq(actorSelection(actorRef)), eqSerializedReadData());
765
766         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
767
768         transactionProxy.read(TestModel.TEST_PATH);
769
770         transactionProxy.close();
771
772         verify(mockActorContext).sendOperationAsync(
773                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
774     }
775
776
777     /**
778      * Method to test a local Tx actor. The Tx paths are matched to decide if the
779      * Tx actor is local or not. This is done by mocking the Tx actor path
780      * and the caller paths and ensuring that the paths have the remote-address format
781      *
782      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
783      * the paths returned for the actors for all the tests are not qualified remote paths.
784      * Hence are treated as non-local/remote actors. In short, all tests except
785      * few below run for remote actors
786      *
787      * @throws Exception
788      */
789     @Test
790     public void testLocalTxActorRead() throws Exception {
791         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
792         doReturn(true).when(mockActorContext).isPathLocal(anyString());
793
794         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
795
796         // negative test case with null as the reply
797         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
798             any(ActorSelection.class), eqReadData());
799
800         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
801             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
802
803         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
804
805         // test case with node as read data reply
806         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
807
808         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
809             any(ActorSelection.class), eqReadData());
810
811         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
812
813         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
814
815         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
816
817         // test for local data exists
818         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
819             any(ActorSelection.class), eqDataExists());
820
821         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
822
823         assertEquals("Exists response", true, exists);
824     }
825
826     @Test
827     public void testLocalTxActorReady() throws Exception {
828         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
829         doReturn(true).when(mockActorContext).isPathLocal(anyString());
830
831         expectBatchedModificationsReady(actorRef, true);
832
833         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
834
835         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
836         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
837
838         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
839
840         assertTrue(ready instanceof SingleCommitCohortProxy);
841
842         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
843     }
844
845     private static interface TransactionProxyOperation {
846         void run(TransactionProxy transactionProxy);
847     }
848
849     private void throttleOperation(TransactionProxyOperation operation) {
850         throttleOperation(operation, 1, true);
851     }
852
853     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
854         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
855                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
856     }
857
858     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
859         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
860                 Optional.<DataTree>absent());
861     }
862
863     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
864         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
865                 dataTreeOptional);
866     }
867
868
869     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
870         ActorSystem actorSystem = getSystem();
871         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
872
873         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
874         // we now allow one extra permit to be allowed for ready
875         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
876                 shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
877
878         doReturn(actorSystem.actorSelection(shardActorRef.path())).
879                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
880
881         if(shardFound) {
882             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
883                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
884             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
885                     when(mockActorContext).findPrimaryShardAsync(eq("cars"));
886
887         } else {
888             doReturn(Futures.failed(new Exception("not found")))
889                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
890         }
891
892         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
893
894         doReturn(incompleteFuture()).when(mockActorContext).
895         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
896                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
897
898         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
899
900         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
901
902         long start = System.nanoTime();
903
904         operation.run(transactionProxy);
905
906         long end = System.nanoTime();
907
908         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
909                 expectedCompletionTime, (end-start)),
910                 ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
911
912     }
913
914     private void completeOperation(TransactionProxyOperation operation){
915         completeOperation(operation, true);
916     }
917
918     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
919         ActorSystem actorSystem = getSystem();
920         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
921
922         doReturn(actorSystem.actorSelection(shardActorRef.path())).
923                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
924
925         if(shardFound) {
926             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
927                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
928         } else {
929             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
930                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
931         }
932
933         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
934         String actorPath = txActorRef.path().toString();
935         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, "txn-1",
936                 DataStoreVersions.CURRENT_VERSION);
937
938         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
939
940         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
941                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
942                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
943
944         doReturn(true).when(mockActorContext).isPathLocal(anyString());
945
946         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
947
948         long start = System.nanoTime();
949
950         operation.run(transactionProxy);
951
952         long end = System.nanoTime();
953
954         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
955         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
956                 expected, (end-start)), (end - start) <= expected);
957     }
958
959     private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
960         ActorSystem actorSystem = getSystem();
961         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
962
963         doReturn(actorSystem.actorSelection(shardActorRef.path())).
964                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
965
966         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
967                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
968
969         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
970
971         long start = System.nanoTime();
972
973         operation.run(transactionProxy);
974
975         long end = System.nanoTime();
976
977         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
978         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
979                 expected, (end-start)), (end - start) <= expected);
980     }
981
982     private static Optional<DataTree> createDataTree(){
983         DataTree dataTree = mock(DataTree.class);
984         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
985         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
986         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
987
988         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
989         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
990
991         return dataTreeOptional;
992     }
993
994     private static Optional<DataTree> createDataTree(NormalizedNode<?, ?> readResponse){
995         DataTree dataTree = mock(DataTree.class);
996         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
997         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
998         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
999
1000         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
1001         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
1002         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
1003
1004         return dataTreeOptional;
1005     }
1006
1007
1008     @Test
1009     public void testWriteCompletionForLocalShard(){
1010         completeOperationLocal(new TransactionProxyOperation() {
1011             @Override
1012             public void run(TransactionProxy transactionProxy) {
1013                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1014
1015                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1016
1017                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1018
1019             }
1020         }, createDataTree());
1021     }
1022
1023     @Test
1024     public void testWriteThrottlingWhenShardFound(){
1025         throttleOperation(new TransactionProxyOperation() {
1026             @Override
1027             public void run(TransactionProxy transactionProxy) {
1028                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1029
1030                 expectIncompleteBatchedModifications();
1031
1032                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1033
1034                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1035             }
1036         });
1037     }
1038
1039     @Test
1040     public void testWriteThrottlingWhenShardNotFound(){
1041         // Confirm that there is no throttling when the Shard is not found
1042         completeOperation(new TransactionProxyOperation() {
1043             @Override
1044             public void run(TransactionProxy transactionProxy) {
1045                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1046
1047                 expectBatchedModifications(2);
1048
1049                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1050
1051                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1052             }
1053         }, false);
1054
1055     }
1056
1057
1058     @Test
1059     public void testWriteCompletion(){
1060         completeOperation(new TransactionProxyOperation() {
1061             @Override
1062             public void run(TransactionProxy transactionProxy) {
1063                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1064
1065                 expectBatchedModifications(2);
1066
1067                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1068
1069                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1070             }
1071         });
1072     }
1073
1074     @Test
1075     public void testMergeThrottlingWhenShardFound(){
1076         throttleOperation(new TransactionProxyOperation() {
1077             @Override
1078             public void run(TransactionProxy transactionProxy) {
1079                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1080
1081                 expectIncompleteBatchedModifications();
1082
1083                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1084
1085                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1086             }
1087         });
1088     }
1089
1090     @Test
1091     public void testMergeThrottlingWhenShardNotFound(){
1092         completeOperation(new TransactionProxyOperation() {
1093             @Override
1094             public void run(TransactionProxy transactionProxy) {
1095                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1096
1097                 expectBatchedModifications(2);
1098
1099                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1100
1101                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1102             }
1103         }, false);
1104     }
1105
1106     @Test
1107     public void testMergeCompletion(){
1108         completeOperation(new TransactionProxyOperation() {
1109             @Override
1110             public void run(TransactionProxy transactionProxy) {
1111                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1112
1113                 expectBatchedModifications(2);
1114
1115                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1116
1117                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1118             }
1119         });
1120
1121     }
1122
1123     @Test
1124     public void testMergeCompletionForLocalShard(){
1125         completeOperationLocal(new TransactionProxyOperation() {
1126             @Override
1127             public void run(TransactionProxy transactionProxy) {
1128                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1129
1130                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1131
1132                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1133
1134             }
1135         }, createDataTree());
1136     }
1137
1138
1139     @Test
1140     public void testDeleteThrottlingWhenShardFound(){
1141
1142         throttleOperation(new TransactionProxyOperation() {
1143             @Override
1144             public void run(TransactionProxy transactionProxy) {
1145                 expectIncompleteBatchedModifications();
1146
1147                 transactionProxy.delete(TestModel.TEST_PATH);
1148
1149                 transactionProxy.delete(TestModel.TEST_PATH);
1150             }
1151         });
1152     }
1153
1154
1155     @Test
1156     public void testDeleteThrottlingWhenShardNotFound(){
1157
1158         completeOperation(new TransactionProxyOperation() {
1159             @Override
1160             public void run(TransactionProxy transactionProxy) {
1161                 expectBatchedModifications(2);
1162
1163                 transactionProxy.delete(TestModel.TEST_PATH);
1164
1165                 transactionProxy.delete(TestModel.TEST_PATH);
1166             }
1167         }, false);
1168     }
1169
1170     @Test
1171     public void testDeleteCompletionForLocalShard(){
1172         completeOperationLocal(new TransactionProxyOperation() {
1173             @Override
1174             public void run(TransactionProxy transactionProxy) {
1175
1176                 transactionProxy.delete(TestModel.TEST_PATH);
1177
1178                 transactionProxy.delete(TestModel.TEST_PATH);
1179             }
1180         }, createDataTree());
1181
1182     }
1183
1184     @Test
1185     public void testDeleteCompletion(){
1186         completeOperation(new TransactionProxyOperation() {
1187             @Override
1188             public void run(TransactionProxy transactionProxy) {
1189                 expectBatchedModifications(2);
1190
1191                 transactionProxy.delete(TestModel.TEST_PATH);
1192
1193                 transactionProxy.delete(TestModel.TEST_PATH);
1194             }
1195         });
1196
1197     }
1198
1199     @Test
1200     public void testReadThrottlingWhenShardFound(){
1201
1202         throttleOperation(new TransactionProxyOperation() {
1203             @Override
1204             public void run(TransactionProxy transactionProxy) {
1205                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1206                         any(ActorSelection.class), eqReadData());
1207
1208                 transactionProxy.read(TestModel.TEST_PATH);
1209
1210                 transactionProxy.read(TestModel.TEST_PATH);
1211             }
1212         });
1213     }
1214
1215     @Test
1216     public void testReadThrottlingWhenShardNotFound(){
1217
1218         completeOperation(new TransactionProxyOperation() {
1219             @Override
1220             public void run(TransactionProxy transactionProxy) {
1221                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1222                         any(ActorSelection.class), eqReadData());
1223
1224                 transactionProxy.read(TestModel.TEST_PATH);
1225
1226                 transactionProxy.read(TestModel.TEST_PATH);
1227             }
1228         }, false);
1229     }
1230
1231
1232     @Test
1233     public void testReadCompletion(){
1234         completeOperation(new TransactionProxyOperation() {
1235             @Override
1236             public void run(TransactionProxy transactionProxy) {
1237                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1238
1239                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1240                         any(ActorSelection.class), eqReadData());
1241
1242                 transactionProxy.read(TestModel.TEST_PATH);
1243
1244                 transactionProxy.read(TestModel.TEST_PATH);
1245             }
1246         });
1247
1248     }
1249
1250     @Test
1251     public void testReadCompletionForLocalShard(){
1252         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1253         completeOperationLocal(new TransactionProxyOperation() {
1254             @Override
1255             public void run(TransactionProxy transactionProxy) {
1256                 transactionProxy.read(TestModel.TEST_PATH);
1257
1258                 transactionProxy.read(TestModel.TEST_PATH);
1259             }
1260         }, createDataTree(nodeToRead));
1261
1262     }
1263
1264     @Test
1265     public void testReadCompletionForLocalShardWhenExceptionOccurs(){
1266         completeOperationLocal(new TransactionProxyOperation() {
1267             @Override
1268             public void run(TransactionProxy transactionProxy) {
1269                 transactionProxy.read(TestModel.TEST_PATH);
1270
1271                 transactionProxy.read(TestModel.TEST_PATH);
1272             }
1273         }, createDataTree());
1274
1275     }
1276
1277     @Test
1278     public void testExistsThrottlingWhenShardFound(){
1279
1280         throttleOperation(new TransactionProxyOperation() {
1281             @Override
1282             public void run(TransactionProxy transactionProxy) {
1283                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1284                         any(ActorSelection.class), eqDataExists());
1285
1286                 transactionProxy.exists(TestModel.TEST_PATH);
1287
1288                 transactionProxy.exists(TestModel.TEST_PATH);
1289             }
1290         });
1291     }
1292
1293     @Test
1294     public void testExistsThrottlingWhenShardNotFound(){
1295
1296         completeOperation(new TransactionProxyOperation() {
1297             @Override
1298             public void run(TransactionProxy transactionProxy) {
1299                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1300                         any(ActorSelection.class), eqDataExists());
1301
1302                 transactionProxy.exists(TestModel.TEST_PATH);
1303
1304                 transactionProxy.exists(TestModel.TEST_PATH);
1305             }
1306         }, false);
1307     }
1308
1309
1310     @Test
1311     public void testExistsCompletion(){
1312         completeOperation(new TransactionProxyOperation() {
1313             @Override
1314             public void run(TransactionProxy transactionProxy) {
1315                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1316                         any(ActorSelection.class), eqDataExists());
1317
1318                 transactionProxy.exists(TestModel.TEST_PATH);
1319
1320                 transactionProxy.exists(TestModel.TEST_PATH);
1321             }
1322         });
1323
1324     }
1325
1326     @Test
1327     public void testExistsCompletionForLocalShard(){
1328         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1329         completeOperationLocal(new TransactionProxyOperation() {
1330             @Override
1331             public void run(TransactionProxy transactionProxy) {
1332                 transactionProxy.exists(TestModel.TEST_PATH);
1333
1334                 transactionProxy.exists(TestModel.TEST_PATH);
1335             }
1336         }, createDataTree(nodeToRead));
1337
1338     }
1339
1340     @Test
1341     public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
1342         completeOperationLocal(new TransactionProxyOperation() {
1343             @Override
1344             public void run(TransactionProxy transactionProxy) {
1345                 transactionProxy.exists(TestModel.TEST_PATH);
1346
1347                 transactionProxy.exists(TestModel.TEST_PATH);
1348             }
1349         }, createDataTree());
1350
1351     }
1352     @Test
1353     public void testReadyThrottling(){
1354
1355         throttleOperation(new TransactionProxyOperation() {
1356             @Override
1357             public void run(TransactionProxy transactionProxy) {
1358                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1359
1360                 expectBatchedModifications(1);
1361
1362                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1363
1364                 transactionProxy.ready();
1365             }
1366         });
1367     }
1368
1369     @Test
1370     public void testReadyThrottlingWithTwoTransactionContexts(){
1371         throttleOperation(new TransactionProxyOperation() {
1372             @Override
1373             public void run(TransactionProxy transactionProxy) {
1374                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1375                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1376
1377                 expectBatchedModifications(2);
1378
1379                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1380
1381                 // Trying to write to Cars will cause another transaction context to get created
1382                 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1383
1384                 // Now ready should block for both transaction contexts
1385                 transactionProxy.ready();
1386             }
1387         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
1388     }
1389
1390     private void testModificationOperationBatching(TransactionType type) throws Exception {
1391         int shardBatchedModificationCount = 3;
1392         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1393
1394         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1395
1396         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1397
1398         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1399         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1400
1401         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1402         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1403
1404         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1405         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1406
1407         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1408         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1409
1410         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1411         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1412
1413         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1414         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1415
1416         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1417         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1418
1419         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1420
1421         transactionProxy.write(writePath1, writeNode1);
1422         transactionProxy.write(writePath2, writeNode2);
1423         transactionProxy.delete(deletePath1);
1424         transactionProxy.merge(mergePath1, mergeNode1);
1425         transactionProxy.merge(mergePath2, mergeNode2);
1426         transactionProxy.write(writePath3, writeNode3);
1427         transactionProxy.merge(mergePath3, mergeNode3);
1428         transactionProxy.delete(deletePath2);
1429
1430         // This sends the last batch.
1431         transactionProxy.ready();
1432
1433         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1434         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1435
1436         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1437                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1438
1439         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1440                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1441
1442         verifyBatchedModifications(batchedModifications.get(2), true, true,
1443                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1444
1445         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1446     }
1447
1448     @Test
1449     public void testReadWriteModificationOperationBatching() throws Throwable {
1450         testModificationOperationBatching(READ_WRITE);
1451     }
1452
1453     @Test
1454     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1455         testModificationOperationBatching(WRITE_ONLY);
1456     }
1457
1458     @Test
1459     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1460         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1461         testModificationOperationBatching(WRITE_ONLY);
1462     }
1463
1464     @Test
1465     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1466
1467         int shardBatchedModificationCount = 10;
1468         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1469
1470         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1471
1472         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1473
1474         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1475         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1476
1477         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1478         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1479
1480         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1481         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1482
1483         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1484         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1485
1486         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1487
1488         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1489                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1490
1491         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1492                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1493
1494         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1495                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1496
1497         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1498
1499         transactionProxy.write(writePath1, writeNode1);
1500         transactionProxy.write(writePath2, writeNode2);
1501
1502         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1503                 get(5, TimeUnit.SECONDS);
1504
1505         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1506         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1507
1508         transactionProxy.merge(mergePath1, mergeNode1);
1509         transactionProxy.merge(mergePath2, mergeNode2);
1510
1511         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1512
1513         transactionProxy.delete(deletePath);
1514
1515         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1516         assertEquals("Exists response", true, exists);
1517
1518         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1519         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1520
1521         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1522         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1523
1524         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1525                 new WriteModification(writePath2, writeNode2));
1526
1527         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1528                 new MergeModification(mergePath2, mergeNode2));
1529
1530         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1531
1532         InOrder inOrder = Mockito.inOrder(mockActorContext);
1533         inOrder.verify(mockActorContext).executeOperationAsync(
1534                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1535
1536         inOrder.verify(mockActorContext).executeOperationAsync(
1537                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1538
1539         inOrder.verify(mockActorContext).executeOperationAsync(
1540                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1541
1542         inOrder.verify(mockActorContext).executeOperationAsync(
1543                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1544
1545         inOrder.verify(mockActorContext).executeOperationAsync(
1546                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1547
1548         inOrder.verify(mockActorContext).executeOperationAsync(
1549                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1550     }
1551
1552     @Test
1553     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1554
1555         SchemaContext schemaContext = SchemaContextHelper.full();
1556         Configuration configuration = mock(Configuration.class);
1557         doReturn(configuration).when(mockActorContext).getConfiguration();
1558         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1559         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1560
1561         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1562         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1563
1564         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1565         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1566
1567         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1568
1569         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1570
1571         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1572
1573         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1574                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1575
1576         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1577
1578         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1579
1580         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1581
1582         @SuppressWarnings("unchecked")
1583         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1584
1585         for(NormalizedNode<?,?> node : collection){
1586             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1587         }
1588
1589         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1590                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1591
1592         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1593
1594         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1595                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1596
1597         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1598     }
1599
1600
1601     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1602         ActorSystem actorSystem = getSystem();
1603         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1604
1605         doReturn(getSystem().actorSelection(shardActorRef.path())).
1606                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1607
1608         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1609                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1610
1611         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1612
1613         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1614
1615         doReturn(actorSystem.actorSelection(txActorRef.path())).
1616                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1617
1618         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1619                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1620                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1621
1622         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1623                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1624     }
1625 }