8426e26f1b8f200cccc2113f1995690eb56a8a01
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.ActorSystem;
28 import akka.actor.Props;
29 import akka.dispatch.Futures;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Sets;
33 import com.google.common.util.concurrent.CheckedFuture;
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
51 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
52 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
55 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
56 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
57 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
58 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
59 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
60 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
61 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
62 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
63 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
64 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
65 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
73 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Promise;
76
77 @SuppressWarnings("resource")
78 public class TransactionProxyTest extends AbstractTransactionProxyTest {
79
80     @SuppressWarnings("serial")
81     static class TestException extends RuntimeException {
82     }
83
84     static interface Invoker {
85         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
86     }
87
88     @Test
89     public void testRead() throws Exception {
90         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
91
92         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
93
94         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
95                 eq(actorSelection(actorRef)), eqSerializedReadData());
96
97         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
98                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
99
100         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
101
102         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
103
104         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
105                 eq(actorSelection(actorRef)), eqSerializedReadData());
106
107         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108
109         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
110
111         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
112     }
113
114     @Test(expected = ReadFailedException.class)
115     public void testReadWithInvalidReplyMessageType() throws Exception {
116         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
117
118         doReturn(Futures.successful(new Object())).when(mockActorContext).
119                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
120
121         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
122
123         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
124     }
125
126     @Test(expected = TestException.class)
127     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
128         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
129
130         doReturn(Futures.failed(new TestException())).when(mockActorContext).
131                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
132
133         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
134
135         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
136     }
137
138     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
139             throws Throwable {
140         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
141
142         if (exToThrow instanceof PrimaryNotFoundException) {
143             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
144         } else {
145             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
146                     when(mockActorContext).findPrimaryShardAsync(anyString());
147         }
148
149         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
150                 any(ActorSelection.class), any(), any(Timeout.class));
151
152         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
153
154         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
155     }
156
157     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
158         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
159             @Override
160             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
161                 return proxy.read(TestModel.TEST_PATH);
162             }
163         });
164     }
165
166     @Test(expected = PrimaryNotFoundException.class)
167     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
168         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
169     }
170
171     @Test(expected = TimeoutException.class)
172     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
173         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
174                 new Exception("reason")));
175     }
176
177     @Test(expected = TestException.class)
178     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
179         testReadWithExceptionOnInitialCreateTransaction(new TestException());
180     }
181
182     @Test
183     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
184         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
185
186         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
187
188         expectBatchedModifications(actorRef, 1);
189
190         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
191                 eq(actorSelection(actorRef)), eqSerializedReadData());
192
193         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
194
195         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
196
197         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
198                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
199
200         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
201         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
202
203         InOrder inOrder = Mockito.inOrder(mockActorContext);
204         inOrder.verify(mockActorContext).executeOperationAsync(
205                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
206
207         inOrder.verify(mockActorContext).executeOperationAsync(
208                 eq(actorSelection(actorRef)), eqSerializedReadData());
209     }
210
211     @Test(expected=IllegalStateException.class)
212     public void testReadPreConditionCheck() {
213         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
214         transactionProxy.read(TestModel.TEST_PATH);
215     }
216
217     @Test(expected=IllegalArgumentException.class)
218     public void testInvalidCreateTransactionReply() throws Throwable {
219         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
220
221         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
222             actorSelection(actorRef.path().toString());
223
224         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
225             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
226
227         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
228             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
229             any(Timeout.class));
230
231         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
232
233         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
234     }
235
236     @Test
237     public void testExists() throws Exception {
238         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
239
240         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
241
242         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
243                 eq(actorSelection(actorRef)), eqSerializedDataExists());
244
245         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
246
247         assertEquals("Exists response", false, exists);
248
249         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
250                 eq(actorSelection(actorRef)), eqSerializedDataExists());
251
252         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
253
254         assertEquals("Exists response", true, exists);
255     }
256
257     @Test(expected = PrimaryNotFoundException.class)
258     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
259         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
260             @Override
261             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
262                 return proxy.exists(TestModel.TEST_PATH);
263             }
264         });
265     }
266
267     @Test(expected = ReadFailedException.class)
268     public void testExistsWithInvalidReplyMessageType() throws Exception {
269         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
270
271         doReturn(Futures.successful(new Object())).when(mockActorContext).
272                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
273
274         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
275
276         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
277     }
278
279     @Test(expected = TestException.class)
280     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
281         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
282
283         doReturn(Futures.failed(new TestException())).when(mockActorContext).
284                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
285
286         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
287
288         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
289     }
290
291     @Test
292     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
293         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
294
295         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296
297         expectBatchedModifications(actorRef, 1);
298
299         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
300                 eq(actorSelection(actorRef)), eqSerializedDataExists());
301
302         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
303
304         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
305
306         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
307
308         assertEquals("Exists response", true, exists);
309
310         InOrder inOrder = Mockito.inOrder(mockActorContext);
311         inOrder.verify(mockActorContext).executeOperationAsync(
312                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
313
314         inOrder.verify(mockActorContext).executeOperationAsync(
315                 eq(actorSelection(actorRef)), eqSerializedDataExists());
316     }
317
318     @Test(expected=IllegalStateException.class)
319     public void testExistsPreConditionCheck() {
320         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
321         transactionProxy.exists(TestModel.TEST_PATH);
322     }
323
324     @Test
325     public void testWrite() throws Exception {
326         dataStoreContextBuilder.shardBatchedModificationCount(1);
327         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
328
329         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
330
331         expectBatchedModifications(actorRef, 1);
332
333         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
334
335         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
336
337         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
338     }
339
340     @Test
341     public void testWriteAfterAsyncRead() throws Throwable {
342         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
343
344         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
345         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
346                 eq(getSystem().actorSelection(actorRef.path())),
347                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
348
349         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
350                 eq(actorSelection(actorRef)), eqSerializedReadData());
351
352         expectBatchedModificationsReady(actorRef);
353
354         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
355
356         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
357
358         final CountDownLatch readComplete = new CountDownLatch(1);
359         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
360         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
361                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
362                     @Override
363                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
364                         try {
365                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
366                         } catch (Exception e) {
367                             caughtEx.set(e);
368                         } finally {
369                             readComplete.countDown();
370                         }
371                     }
372
373                     @Override
374                     public void onFailure(Throwable t) {
375                         caughtEx.set(t);
376                         readComplete.countDown();
377                     }
378                 });
379
380         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
381
382         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
383
384         if(caughtEx.get() != null) {
385             throw caughtEx.get();
386         }
387
388         // This sends the batched modification.
389         transactionProxy.ready();
390
391         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
392     }
393
394     @Test(expected=IllegalStateException.class)
395     public void testWritePreConditionCheck() {
396         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
397         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
398     }
399
400     @Test(expected=IllegalStateException.class)
401     public void testWriteAfterReadyPreConditionCheck() {
402         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
403
404         transactionProxy.ready();
405
406         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
407     }
408
409     @Test
410     public void testMerge() throws Exception {
411         dataStoreContextBuilder.shardBatchedModificationCount(1);
412         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
413
414         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
415
416         expectBatchedModifications(actorRef, 1);
417
418         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
419
420         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
421
422         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
423     }
424
425     @Test
426     public void testDelete() throws Exception {
427         dataStoreContextBuilder.shardBatchedModificationCount(1);
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
429
430         expectBatchedModifications(actorRef, 1);
431
432         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
433
434         transactionProxy.delete(TestModel.TEST_PATH);
435
436         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
437     }
438
439     @Test
440     public void testReadWrite() throws Exception {
441         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
442
443         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
444
445         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
446                 eq(actorSelection(actorRef)), eqSerializedReadData());
447
448         expectBatchedModifications(actorRef, 1);
449
450         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
451
452         transactionProxy.read(TestModel.TEST_PATH);
453
454         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
455
456         transactionProxy.read(TestModel.TEST_PATH);
457
458         transactionProxy.read(TestModel.TEST_PATH);
459
460         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
461         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
462
463         verifyBatchedModifications(batchedModifications.get(0), false,
464                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
465     }
466
467     @Test
468     public void testReadyWithReadWrite() throws Exception {
469         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
470
471         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472
473         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
474                 eq(actorSelection(actorRef)), eqSerializedReadData());
475
476         expectBatchedModificationsReady(actorRef, true);
477
478         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
479
480         transactionProxy.read(TestModel.TEST_PATH);
481
482         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
483
484         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
485
486         assertTrue(ready instanceof SingleCommitCohortProxy);
487
488         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
489
490         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
491         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
492
493         verifyBatchedModifications(batchedModifications.get(0), true, true,
494                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
495
496         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
497     }
498
499     @Test
500     public void testReadyWithNoModifications() throws Exception {
501         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
502
503         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
504                 eq(actorSelection(actorRef)), eqSerializedReadData());
505
506         expectBatchedModificationsReady(actorRef, true);
507
508         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
509
510         transactionProxy.read(TestModel.TEST_PATH);
511
512         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
513
514         assertTrue(ready instanceof SingleCommitCohortProxy);
515
516         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
517
518         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
519         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
520
521         verifyBatchedModifications(batchedModifications.get(0), true, true);
522     }
523
524     @Test
525     public void testReadyWithMultipleShardWrites() throws Exception {
526         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
527
528         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
529
530         expectBatchedModificationsReady(actorRef1);
531         expectBatchedModificationsReady(actorRef2);
532
533         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
534
535         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
536         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
537
538         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
539
540         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
541
542         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
543                 actorSelection(actorRef2));
544     }
545
546     @Test
547     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
548         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
549
550         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
551
552         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
553
554         expectBatchedModificationsReady(actorRef, true);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559
560         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
561
562         assertTrue(ready instanceof SingleCommitCohortProxy);
563
564         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
565
566         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
567         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
568
569         verifyBatchedModifications(batchedModifications.get(0), true, true,
570                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
571     }
572
573     @Test
574     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
575         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
576         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
577
578         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
579
580         expectBatchedModificationsReady(actorRef, true);
581
582         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
583
584         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
585
586         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
587
588         assertTrue(ready instanceof SingleCommitCohortProxy);
589
590         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
591
592         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
593         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
594
595         verifyBatchedModifications(batchedModifications.get(0), false,
596                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
597
598         verifyBatchedModifications(batchedModifications.get(1), true, true);
599     }
600
601     @Test
602     public void testReadyWithReplyFailure() throws Exception {
603         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
604
605         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
606
607         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
608
609         expectFailedBatchedModifications(actorRef);
610
611         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
612
613         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
614
615         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
616
617         assertTrue(ready instanceof SingleCommitCohortProxy);
618
619         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
620     }
621
622     @Test
623     public void testReadyWithDebugContextEnabled() throws Exception {
624         dataStoreContextBuilder.transactionDebugContextEnabled(true);
625
626         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
627
628         expectBatchedModificationsReady(actorRef, true);
629
630         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
631
632         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
633
634         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
635
636         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
637
638         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
639     }
640
641     @Test
642     public void testReadyWithLocalTransaction() throws Exception {
643         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
644
645         doReturn(getSystem().actorSelection(shardActorRef.path())).
646                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
647
648         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).
649                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
650
651         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
652
653         expectReadyLocalTransaction(shardActorRef, true);
654
655         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
656         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
657
658         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
659         assertTrue(ready instanceof SingleCommitCohortProxy);
660         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
661     }
662
663     @Test
664     public void testReadyWithLocalTransactionWithFailure() throws Exception {
665         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
666
667         doReturn(getSystem().actorSelection(shardActorRef.path())).
668                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
669
670         Optional<DataTree> mockDataTree = createDataTree();
671         DataTreeModification mockModification = mockDataTree.get().takeSnapshot().newModification();
672         doThrow(new RuntimeException("mock")).when(mockModification).ready();
673
674         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).
675                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
676
677         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
678
679         expectReadyLocalTransaction(shardActorRef, true);
680
681         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
682         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
683
684         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
685         assertTrue(ready instanceof SingleCommitCohortProxy);
686         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
687     }
688
689     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
690         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
691
692         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
693
694         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
695
696         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
697
698         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
699
700         transactionProxy.delete(TestModel.TEST_PATH);
701
702         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
703
704         assertTrue(ready instanceof SingleCommitCohortProxy);
705
706         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
707     }
708
709     @Test
710     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
711         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
712     }
713
714     @Test
715     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
716         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
717     }
718
719     @Test
720     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
721         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
722     }
723
724     @Test
725     public void testReadyWithInvalidReplyMessageType() throws Exception {
726         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
727         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
728
729         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
730
731         doReturn(Futures.successful(new Object())).when(mockActorContext).
732                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
733
734         expectBatchedModificationsReady(actorRef2);
735
736         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
737
738         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
739         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
740
741         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
742
743         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
744
745         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
746                 IllegalArgumentException.class);
747     }
748
749     @Test
750     public void testGetIdentifier() {
751         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
752         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
753
754         Object id = transactionProxy.getIdentifier();
755         assertNotNull("getIdentifier returned null", id);
756         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
757     }
758
759     @Test
760     public void testClose() throws Exception{
761         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
762
763         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
764                 eq(actorSelection(actorRef)), eqSerializedReadData());
765
766         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
767
768         transactionProxy.read(TestModel.TEST_PATH);
769
770         transactionProxy.close();
771
772         verify(mockActorContext).sendOperationAsync(
773                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
774     }
775
776
777     /**
778      * Method to test a local Tx actor. The Tx paths are matched to decide if the
779      * Tx actor is local or not. This is done by mocking the Tx actor path
780      * and the caller paths and ensuring that the paths have the remote-address format
781      *
782      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
783      * the paths returned for the actors for all the tests are not qualified remote paths.
784      * Hence are treated as non-local/remote actors. In short, all tests except
785      * few below run for remote actors
786      *
787      * @throws Exception
788      */
789     @Test
790     public void testLocalTxActorRead() throws Exception {
791         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
792         doReturn(true).when(mockActorContext).isPathLocal(anyString());
793
794         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
795
796         // negative test case with null as the reply
797         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
798             any(ActorSelection.class), eqReadData());
799
800         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
801             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
802
803         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
804
805         // test case with node as read data reply
806         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
807
808         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
809             any(ActorSelection.class), eqReadData());
810
811         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
812
813         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
814
815         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
816
817         // test for local data exists
818         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
819             any(ActorSelection.class), eqDataExists());
820
821         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
822
823         assertEquals("Exists response", true, exists);
824     }
825
826     @Test
827     public void testLocalTxActorReady() throws Exception {
828         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
829         doReturn(true).when(mockActorContext).isPathLocal(anyString());
830
831         expectBatchedModificationsReady(actorRef, true);
832
833         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
834
835         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
836         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
837
838         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
839
840         assertTrue(ready instanceof SingleCommitCohortProxy);
841
842         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
843     }
844
845     private static interface TransactionProxyOperation {
846         void run(TransactionProxy transactionProxy);
847     }
848
849     private void throttleOperation(TransactionProxyOperation operation) {
850         throttleOperation(operation, 1, true);
851     }
852
853     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
854         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
855                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
856     }
857
858     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
859         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
860                 Optional.<DataTree>absent());
861     }
862
863     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
864         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
865                 dataTreeOptional);
866     }
867
868
869     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
870         ActorSystem actorSystem = getSystem();
871         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
872
873         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
874         // we now allow one extra permit to be allowed for ready
875         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
876                 shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
877
878         doReturn(actorSystem.actorSelection(shardActorRef.path())).
879                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
880
881         if(shardFound) {
882             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
883                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
884             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
885                     when(mockActorContext).findPrimaryShardAsync(eq("cars"));
886
887         } else {
888             doReturn(Futures.failed(new Exception("not found")))
889                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
890         }
891
892         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
893
894         doReturn(incompleteFuture()).when(mockActorContext).
895         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
896                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
897
898         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
899
900         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
901
902         long start = System.nanoTime();
903
904         operation.run(transactionProxy);
905
906         long end = System.nanoTime();
907
908         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
909                 expectedCompletionTime, (end-start)),
910                 ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
911
912     }
913
914     private void completeOperation(TransactionProxyOperation operation){
915         completeOperation(operation, true);
916     }
917
918     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
919         ActorSystem actorSystem = getSystem();
920         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
921
922         doReturn(actorSystem.actorSelection(shardActorRef.path())).
923                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
924
925         if(shardFound) {
926             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
927                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
928         } else {
929             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
930                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
931         }
932
933         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
934         String actorPath = txActorRef.path().toString();
935         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
936                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
937                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
938
939         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
940
941         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
942                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
943                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
944
945         doReturn(true).when(mockActorContext).isPathLocal(anyString());
946
947         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
948
949         long start = System.nanoTime();
950
951         operation.run(transactionProxy);
952
953         long end = System.nanoTime();
954
955         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
956         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
957                 expected, (end-start)), (end - start) <= expected);
958     }
959
960     private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
961         ActorSystem actorSystem = getSystem();
962         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
963
964         doReturn(actorSystem.actorSelection(shardActorRef.path())).
965                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
966
967         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
968                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
969
970         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
971
972         long start = System.nanoTime();
973
974         operation.run(transactionProxy);
975
976         long end = System.nanoTime();
977
978         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
979         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
980                 expected, (end-start)), (end - start) <= expected);
981     }
982
983     private static Optional<DataTree> createDataTree(){
984         DataTree dataTree = mock(DataTree.class);
985         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
986         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
987         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
988
989         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
990         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
991
992         return dataTreeOptional;
993     }
994
995     private static Optional<DataTree> createDataTree(NormalizedNode readResponse){
996         DataTree dataTree = mock(DataTree.class);
997         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
998         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
999         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
1000
1001         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
1002         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
1003         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
1004
1005         return dataTreeOptional;
1006     }
1007
1008
1009     @Test
1010     public void testWriteCompletionForLocalShard(){
1011         completeOperationLocal(new TransactionProxyOperation() {
1012             @Override
1013             public void run(TransactionProxy transactionProxy) {
1014                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1015
1016                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1017
1018                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1019
1020             }
1021         }, createDataTree());
1022     }
1023
1024     @Test
1025     public void testWriteThrottlingWhenShardFound(){
1026         throttleOperation(new TransactionProxyOperation() {
1027             @Override
1028             public void run(TransactionProxy transactionProxy) {
1029                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1030
1031                 expectIncompleteBatchedModifications();
1032
1033                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1034
1035                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1036             }
1037         });
1038     }
1039
1040     @Test
1041     public void testWriteThrottlingWhenShardNotFound(){
1042         // Confirm that there is no throttling when the Shard is not found
1043         completeOperation(new TransactionProxyOperation() {
1044             @Override
1045             public void run(TransactionProxy transactionProxy) {
1046                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1047
1048                 expectBatchedModifications(2);
1049
1050                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1051
1052                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1053             }
1054         }, false);
1055
1056     }
1057
1058
1059     @Test
1060     public void testWriteCompletion(){
1061         completeOperation(new TransactionProxyOperation() {
1062             @Override
1063             public void run(TransactionProxy transactionProxy) {
1064                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1065
1066                 expectBatchedModifications(2);
1067
1068                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1069
1070                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1071             }
1072         });
1073     }
1074
1075     @Test
1076     public void testMergeThrottlingWhenShardFound(){
1077         throttleOperation(new TransactionProxyOperation() {
1078             @Override
1079             public void run(TransactionProxy transactionProxy) {
1080                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1081
1082                 expectIncompleteBatchedModifications();
1083
1084                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1085
1086                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1087             }
1088         });
1089     }
1090
1091     @Test
1092     public void testMergeThrottlingWhenShardNotFound(){
1093         completeOperation(new TransactionProxyOperation() {
1094             @Override
1095             public void run(TransactionProxy transactionProxy) {
1096                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1097
1098                 expectBatchedModifications(2);
1099
1100                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1101
1102                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1103             }
1104         }, false);
1105     }
1106
1107     @Test
1108     public void testMergeCompletion(){
1109         completeOperation(new TransactionProxyOperation() {
1110             @Override
1111             public void run(TransactionProxy transactionProxy) {
1112                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1113
1114                 expectBatchedModifications(2);
1115
1116                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1117
1118                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1119             }
1120         });
1121
1122     }
1123
1124     @Test
1125     public void testMergeCompletionForLocalShard(){
1126         completeOperationLocal(new TransactionProxyOperation() {
1127             @Override
1128             public void run(TransactionProxy transactionProxy) {
1129                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1130
1131                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1132
1133                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1134
1135             }
1136         }, createDataTree());
1137     }
1138
1139
1140     @Test
1141     public void testDeleteThrottlingWhenShardFound(){
1142
1143         throttleOperation(new TransactionProxyOperation() {
1144             @Override
1145             public void run(TransactionProxy transactionProxy) {
1146                 expectIncompleteBatchedModifications();
1147
1148                 transactionProxy.delete(TestModel.TEST_PATH);
1149
1150                 transactionProxy.delete(TestModel.TEST_PATH);
1151             }
1152         });
1153     }
1154
1155
1156     @Test
1157     public void testDeleteThrottlingWhenShardNotFound(){
1158
1159         completeOperation(new TransactionProxyOperation() {
1160             @Override
1161             public void run(TransactionProxy transactionProxy) {
1162                 expectBatchedModifications(2);
1163
1164                 transactionProxy.delete(TestModel.TEST_PATH);
1165
1166                 transactionProxy.delete(TestModel.TEST_PATH);
1167             }
1168         }, false);
1169     }
1170
1171     @Test
1172     public void testDeleteCompletionForLocalShard(){
1173         completeOperationLocal(new TransactionProxyOperation() {
1174             @Override
1175             public void run(TransactionProxy transactionProxy) {
1176
1177                 transactionProxy.delete(TestModel.TEST_PATH);
1178
1179                 transactionProxy.delete(TestModel.TEST_PATH);
1180             }
1181         }, createDataTree());
1182
1183     }
1184
1185     @Test
1186     public void testDeleteCompletion(){
1187         completeOperation(new TransactionProxyOperation() {
1188             @Override
1189             public void run(TransactionProxy transactionProxy) {
1190                 expectBatchedModifications(2);
1191
1192                 transactionProxy.delete(TestModel.TEST_PATH);
1193
1194                 transactionProxy.delete(TestModel.TEST_PATH);
1195             }
1196         });
1197
1198     }
1199
1200     @Test
1201     public void testReadThrottlingWhenShardFound(){
1202
1203         throttleOperation(new TransactionProxyOperation() {
1204             @Override
1205             public void run(TransactionProxy transactionProxy) {
1206                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1207                         any(ActorSelection.class), eqReadData());
1208
1209                 transactionProxy.read(TestModel.TEST_PATH);
1210
1211                 transactionProxy.read(TestModel.TEST_PATH);
1212             }
1213         });
1214     }
1215
1216     @Test
1217     public void testReadThrottlingWhenShardNotFound(){
1218
1219         completeOperation(new TransactionProxyOperation() {
1220             @Override
1221             public void run(TransactionProxy transactionProxy) {
1222                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1223                         any(ActorSelection.class), eqReadData());
1224
1225                 transactionProxy.read(TestModel.TEST_PATH);
1226
1227                 transactionProxy.read(TestModel.TEST_PATH);
1228             }
1229         }, false);
1230     }
1231
1232
1233     @Test
1234     public void testReadCompletion(){
1235         completeOperation(new TransactionProxyOperation() {
1236             @Override
1237             public void run(TransactionProxy transactionProxy) {
1238                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1239
1240                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1241                         any(ActorSelection.class), eqReadData());
1242
1243                 transactionProxy.read(TestModel.TEST_PATH);
1244
1245                 transactionProxy.read(TestModel.TEST_PATH);
1246             }
1247         });
1248
1249     }
1250
1251     @Test
1252     public void testReadCompletionForLocalShard(){
1253         final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1254         completeOperationLocal(new TransactionProxyOperation() {
1255             @Override
1256             public void run(TransactionProxy transactionProxy) {
1257                 transactionProxy.read(TestModel.TEST_PATH);
1258
1259                 transactionProxy.read(TestModel.TEST_PATH);
1260             }
1261         }, createDataTree(nodeToRead));
1262
1263     }
1264
1265     @Test
1266     public void testReadCompletionForLocalShardWhenExceptionOccurs(){
1267         completeOperationLocal(new TransactionProxyOperation() {
1268             @Override
1269             public void run(TransactionProxy transactionProxy) {
1270                 transactionProxy.read(TestModel.TEST_PATH);
1271
1272                 transactionProxy.read(TestModel.TEST_PATH);
1273             }
1274         }, createDataTree());
1275
1276     }
1277
1278     @Test
1279     public void testExistsThrottlingWhenShardFound(){
1280
1281         throttleOperation(new TransactionProxyOperation() {
1282             @Override
1283             public void run(TransactionProxy transactionProxy) {
1284                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1285                         any(ActorSelection.class), eqDataExists());
1286
1287                 transactionProxy.exists(TestModel.TEST_PATH);
1288
1289                 transactionProxy.exists(TestModel.TEST_PATH);
1290             }
1291         });
1292     }
1293
1294     @Test
1295     public void testExistsThrottlingWhenShardNotFound(){
1296
1297         completeOperation(new TransactionProxyOperation() {
1298             @Override
1299             public void run(TransactionProxy transactionProxy) {
1300                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1301                         any(ActorSelection.class), eqDataExists());
1302
1303                 transactionProxy.exists(TestModel.TEST_PATH);
1304
1305                 transactionProxy.exists(TestModel.TEST_PATH);
1306             }
1307         }, false);
1308     }
1309
1310
1311     @Test
1312     public void testExistsCompletion(){
1313         completeOperation(new TransactionProxyOperation() {
1314             @Override
1315             public void run(TransactionProxy transactionProxy) {
1316                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1317                         any(ActorSelection.class), eqDataExists());
1318
1319                 transactionProxy.exists(TestModel.TEST_PATH);
1320
1321                 transactionProxy.exists(TestModel.TEST_PATH);
1322             }
1323         });
1324
1325     }
1326
1327     @Test
1328     public void testExistsCompletionForLocalShard(){
1329         final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1330         completeOperationLocal(new TransactionProxyOperation() {
1331             @Override
1332             public void run(TransactionProxy transactionProxy) {
1333                 transactionProxy.exists(TestModel.TEST_PATH);
1334
1335                 transactionProxy.exists(TestModel.TEST_PATH);
1336             }
1337         }, createDataTree(nodeToRead));
1338
1339     }
1340
1341     @Test
1342     public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
1343         completeOperationLocal(new TransactionProxyOperation() {
1344             @Override
1345             public void run(TransactionProxy transactionProxy) {
1346                 transactionProxy.exists(TestModel.TEST_PATH);
1347
1348                 transactionProxy.exists(TestModel.TEST_PATH);
1349             }
1350         }, createDataTree());
1351
1352     }
1353     @Test
1354     public void testReadyThrottling(){
1355
1356         throttleOperation(new TransactionProxyOperation() {
1357             @Override
1358             public void run(TransactionProxy transactionProxy) {
1359                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1360
1361                 expectBatchedModifications(1);
1362
1363                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1364
1365                 transactionProxy.ready();
1366             }
1367         });
1368     }
1369
1370     @Test
1371     public void testReadyThrottlingWithTwoTransactionContexts(){
1372         throttleOperation(new TransactionProxyOperation() {
1373             @Override
1374             public void run(TransactionProxy transactionProxy) {
1375                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1376                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1377
1378                 expectBatchedModifications(2);
1379
1380                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1381
1382                 // Trying to write to Cars will cause another transaction context to get created
1383                 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1384
1385                 // Now ready should block for both transaction contexts
1386                 transactionProxy.ready();
1387             }
1388         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
1389     }
1390
1391     private void testModificationOperationBatching(TransactionType type) throws Exception {
1392         int shardBatchedModificationCount = 3;
1393         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1394
1395         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1396
1397         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1398
1399         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1400         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1401
1402         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1403         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1404
1405         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1406         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1407
1408         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1409         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1410
1411         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1412         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1413
1414         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1415         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1416
1417         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1418         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1419
1420         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1421
1422         transactionProxy.write(writePath1, writeNode1);
1423         transactionProxy.write(writePath2, writeNode2);
1424         transactionProxy.delete(deletePath1);
1425         transactionProxy.merge(mergePath1, mergeNode1);
1426         transactionProxy.merge(mergePath2, mergeNode2);
1427         transactionProxy.write(writePath3, writeNode3);
1428         transactionProxy.merge(mergePath3, mergeNode3);
1429         transactionProxy.delete(deletePath2);
1430
1431         // This sends the last batch.
1432         transactionProxy.ready();
1433
1434         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1435         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1436
1437         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1438                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1439
1440         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1441                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1442
1443         verifyBatchedModifications(batchedModifications.get(2), true, true,
1444                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1445
1446         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1447     }
1448
1449     @Test
1450     public void testReadWriteModificationOperationBatching() throws Throwable {
1451         testModificationOperationBatching(READ_WRITE);
1452     }
1453
1454     @Test
1455     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1456         testModificationOperationBatching(WRITE_ONLY);
1457     }
1458
1459     @Test
1460     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1461         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1462         testModificationOperationBatching(WRITE_ONLY);
1463     }
1464
1465     @Test
1466     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1467
1468         int shardBatchedModificationCount = 10;
1469         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1470
1471         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1472
1473         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1474
1475         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1476         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1477
1478         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1479         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1480
1481         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1482         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1483
1484         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1485         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1486
1487         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1488
1489         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1490                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1491
1492         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1493                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1494
1495         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1496                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1497
1498         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1499
1500         transactionProxy.write(writePath1, writeNode1);
1501         transactionProxy.write(writePath2, writeNode2);
1502
1503         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1504                 get(5, TimeUnit.SECONDS);
1505
1506         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1507         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1508
1509         transactionProxy.merge(mergePath1, mergeNode1);
1510         transactionProxy.merge(mergePath2, mergeNode2);
1511
1512         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1513
1514         transactionProxy.delete(deletePath);
1515
1516         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1517         assertEquals("Exists response", true, exists);
1518
1519         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1520         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1521
1522         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1523         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1524
1525         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1526                 new WriteModification(writePath2, writeNode2));
1527
1528         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1529                 new MergeModification(mergePath2, mergeNode2));
1530
1531         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1532
1533         InOrder inOrder = Mockito.inOrder(mockActorContext);
1534         inOrder.verify(mockActorContext).executeOperationAsync(
1535                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1536
1537         inOrder.verify(mockActorContext).executeOperationAsync(
1538                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1539
1540         inOrder.verify(mockActorContext).executeOperationAsync(
1541                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1542
1543         inOrder.verify(mockActorContext).executeOperationAsync(
1544                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1545
1546         inOrder.verify(mockActorContext).executeOperationAsync(
1547                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1548
1549         inOrder.verify(mockActorContext).executeOperationAsync(
1550                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1551     }
1552
1553     @Test
1554     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1555
1556         SchemaContext schemaContext = SchemaContextHelper.full();
1557         Configuration configuration = mock(Configuration.class);
1558         doReturn(configuration).when(mockActorContext).getConfiguration();
1559         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1560         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1561
1562         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1563         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1564
1565         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1566         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1567
1568         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1569
1570         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1571
1572         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1573
1574         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1575                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1576
1577         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1578
1579         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1580
1581         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1582
1583         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1584
1585         for(NormalizedNode<?,?> node : collection){
1586             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1587         }
1588
1589         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1590                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1591
1592         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1593
1594         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1595                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1596
1597         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1598     }
1599
1600
1601     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1602         ActorSystem actorSystem = getSystem();
1603         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1604
1605         doReturn(getSystem().actorSelection(shardActorRef.path())).
1606                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1607
1608         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1609                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1610
1611         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1612
1613         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1614
1615         doReturn(actorSystem.actorSelection(txActorRef.path())).
1616                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1617
1618         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1619                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1620                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1621
1622         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1623                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1624     }
1625 }