Merge "Remove recorded modification Futures from TransactionContext"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.anyString;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Matchers.isA;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
43 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
45 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
46 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
47 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
48 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
49 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
50 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
51 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
53 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
54 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
55 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import scala.concurrent.Promise;
63
64 @SuppressWarnings("resource")
65 public class TransactionProxyTest extends AbstractTransactionProxyTest {
66
67     @SuppressWarnings("serial")
68     static class TestException extends RuntimeException {
69     }
70
71     static interface Invoker {
72         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
73     }
74
75     @Test
76     public void testRead() throws Exception {
77         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
78
79         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
80
81         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
82                 eq(actorSelection(actorRef)), eqSerializedReadData());
83
84         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
85                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
86
87         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
88
89         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
90
91         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
92                 eq(actorSelection(actorRef)), eqSerializedReadData());
93
94         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
95
96         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
97
98         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
99     }
100
101     @Test(expected = ReadFailedException.class)
102     public void testReadWithInvalidReplyMessageType() throws Exception {
103         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
104
105         doReturn(Futures.successful(new Object())).when(mockActorContext).
106                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
107
108         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
109
110         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
111     }
112
113     @Test(expected = TestException.class)
114     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
115         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
116
117         doReturn(Futures.failed(new TestException())).when(mockActorContext).
118                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
119
120         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
121
122         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
123     }
124
125     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
126             throws Throwable {
127         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
128
129         if (exToThrow instanceof PrimaryNotFoundException) {
130             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
131         } else {
132             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
133                     when(mockActorContext).findPrimaryShardAsync(anyString());
134         }
135
136         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
137                 any(ActorSelection.class), any());
138
139         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
140
141         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
142     }
143
144     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
145         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
146             @Override
147             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
148                 return proxy.read(TestModel.TEST_PATH);
149             }
150         });
151     }
152
153     @Test(expected = PrimaryNotFoundException.class)
154     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
155         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
156     }
157
158     @Test(expected = TimeoutException.class)
159     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
160         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
161                 new Exception("reason")));
162     }
163
164     @Test(expected = TestException.class)
165     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
166         testReadWithExceptionOnInitialCreateTransaction(new TestException());
167     }
168
169     @Test
170     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
171         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
172
173         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
174
175         expectBatchedModifications(actorRef, 1);
176
177         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
178                 eq(actorSelection(actorRef)), eqSerializedReadData());
179
180         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
181
182         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
183
184         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
185                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
186
187         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
188         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
189
190         InOrder inOrder = Mockito.inOrder(mockActorContext);
191         inOrder.verify(mockActorContext).executeOperationAsync(
192                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
193
194         inOrder.verify(mockActorContext).executeOperationAsync(
195                 eq(actorSelection(actorRef)), eqSerializedReadData());
196     }
197
198     @Test(expected=IllegalStateException.class)
199     public void testReadPreConditionCheck() {
200         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
201         transactionProxy.read(TestModel.TEST_PATH);
202     }
203
204     @Test(expected=IllegalArgumentException.class)
205     public void testInvalidCreateTransactionReply() throws Throwable {
206         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
207
208         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
209             actorSelection(actorRef.path().toString());
210
211         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
212             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
213
214         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
215             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
216
217         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
218
219         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
220     }
221
222     @Test
223     public void testExists() throws Exception {
224         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
225
226         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
227
228         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
229                 eq(actorSelection(actorRef)), eqSerializedDataExists());
230
231         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
232
233         assertEquals("Exists response", false, exists);
234
235         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
236                 eq(actorSelection(actorRef)), eqSerializedDataExists());
237
238         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
239
240         assertEquals("Exists response", true, exists);
241     }
242
243     @Test(expected = PrimaryNotFoundException.class)
244     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
245         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
246             @Override
247             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
248                 return proxy.exists(TestModel.TEST_PATH);
249             }
250         });
251     }
252
253     @Test(expected = ReadFailedException.class)
254     public void testExistsWithInvalidReplyMessageType() throws Exception {
255         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
256
257         doReturn(Futures.successful(new Object())).when(mockActorContext).
258                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
259
260         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
261                 READ_ONLY);
262
263         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
264     }
265
266     @Test(expected = TestException.class)
267     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
268         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
269
270         doReturn(Futures.failed(new TestException())).when(mockActorContext).
271                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
272
273         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
274
275         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
276     }
277
278     @Test
279     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
280         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
281
282         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
283
284         expectBatchedModifications(actorRef, 1);
285
286         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
287                 eq(actorSelection(actorRef)), eqSerializedDataExists());
288
289         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
290
291         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
292
293         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
294
295         assertEquals("Exists response", true, exists);
296
297         InOrder inOrder = Mockito.inOrder(mockActorContext);
298         inOrder.verify(mockActorContext).executeOperationAsync(
299                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
300
301         inOrder.verify(mockActorContext).executeOperationAsync(
302                 eq(actorSelection(actorRef)), eqSerializedDataExists());
303     }
304
305     @Test(expected=IllegalStateException.class)
306     public void testExistsPreConditionCheck() {
307         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
308         transactionProxy.exists(TestModel.TEST_PATH);
309     }
310
311     @Test
312     public void testWrite() throws Exception {
313         dataStoreContextBuilder.shardBatchedModificationCount(1);
314         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
315
316         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
317
318         expectBatchedModifications(actorRef, 1);
319
320         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
321
322         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
323
324         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
325     }
326
327     @Test
328     public void testWriteAfterAsyncRead() throws Throwable {
329         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
330
331         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
332         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
333                 eq(getSystem().actorSelection(actorRef.path())),
334                 eqCreateTransaction(memberName, READ_WRITE));
335
336         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
337                 eq(actorSelection(actorRef)), eqSerializedReadData());
338
339         expectBatchedModifications(actorRef, 1);
340         expectReadyTransaction(actorRef);
341
342         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
343
344         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
345
346         final CountDownLatch readComplete = new CountDownLatch(1);
347         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
348         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
349                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
350                     @Override
351                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
352                         try {
353                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
354                         } catch (Exception e) {
355                             caughtEx.set(e);
356                         } finally {
357                             readComplete.countDown();
358                         }
359                     }
360
361                     @Override
362                     public void onFailure(Throwable t) {
363                         caughtEx.set(t);
364                         readComplete.countDown();
365                     }
366                 });
367
368         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
369
370         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
371
372         if(caughtEx.get() != null) {
373             throw caughtEx.get();
374         }
375
376         // This sends the batched modification.
377         transactionProxy.ready();
378
379         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
380     }
381
382     @Test(expected=IllegalStateException.class)
383     public void testWritePreConditionCheck() {
384         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
385         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
386     }
387
388     @Test(expected=IllegalStateException.class)
389     public void testWriteAfterReadyPreConditionCheck() {
390         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
391
392         transactionProxy.ready();
393
394         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
395     }
396
397     @Test
398     public void testMerge() throws Exception {
399         dataStoreContextBuilder.shardBatchedModificationCount(1);
400         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
401
402         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
403
404         expectBatchedModifications(actorRef, 1);
405
406         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
407
408         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
409
410         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
411     }
412
413     @Test
414     public void testDelete() throws Exception {
415         dataStoreContextBuilder.shardBatchedModificationCount(1);
416         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
417
418         expectBatchedModifications(actorRef, 1);
419
420         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
421
422         transactionProxy.delete(TestModel.TEST_PATH);
423
424         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
425     }
426
427     @Test
428     public void testReadyWithReadWrite() throws Exception {
429         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
430
431         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
432
433         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
434                 eq(actorSelection(actorRef)), eqSerializedReadData());
435
436         expectBatchedModifications(actorRef, 1);
437         expectReadyTransaction(actorRef);
438
439         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
440
441         transactionProxy.read(TestModel.TEST_PATH);
442
443         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
444
445         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
446
447         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
448
449         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
450
451         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
452
453         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
454                 isA(BatchedModifications.class));
455
456         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
457                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
458     }
459
460     @Test
461     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
462         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
463
464         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
465
466         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
467
468         expectBatchedModificationsReady(actorRef, 1);
469
470         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
471
472         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
473
474         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
475
476         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
477
478         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
479
480         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
481
482         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
483         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
484
485         verifyBatchedModifications(batchedModifications.get(0), true,
486                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
487
488         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
489                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
490     }
491
492     @Test
493     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
494         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
495         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
496
497         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
498
499         expectBatchedModificationsReady(actorRef, 1);
500
501         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
502
503         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
504
505         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
506
507         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
508
509         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
510
511         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
512
513         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
514         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
515
516         verifyBatchedModifications(batchedModifications.get(0), false,
517                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
518
519         verifyBatchedModifications(batchedModifications.get(1), true);
520
521         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
522                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
523     }
524
525     @Test
526     public void testReadyWithReplyFailure() throws Exception {
527         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
528
529         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
530
531         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
532
533         expectFailedBatchedModifications(actorRef);
534
535         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
536
537         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
538
539         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
540
541         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
542
543         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
544
545         verifyCohortFutures(proxy, TestException.class);
546     }
547
548     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
549         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
550
551         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
552
553         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
554
555         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
556
557         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
558
559         transactionProxy.delete(TestModel.TEST_PATH);
560
561         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
562
563         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
564
565         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
566
567         verifyCohortFutures(proxy, toThrow.getClass());
568     }
569
570     @Test
571     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
572         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
573     }
574
575     @Test
576     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
577         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
578     }
579
580     @Test
581     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
582         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
583     }
584
585     @Test
586     public void testReadyWithInvalidReplyMessageType() throws Exception {
587         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
588         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
589
590         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
591
592         //expectBatchedModifications(actorRef, 1);
593
594         doReturn(Futures.successful(new Object())).when(mockActorContext).
595                 executeOperationAsync(eq(actorSelection(actorRef)),
596                         isA(BatchedModifications.class));
597
598         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
599
600         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
601
602         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
603
604         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
605
606         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
607
608         verifyCohortFutures(proxy, IllegalArgumentException.class);
609     }
610
611     @Test
612     public void testGetIdentifier() {
613         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
614         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
615                 TransactionProxy.TransactionType.READ_ONLY);
616
617         Object id = transactionProxy.getIdentifier();
618         assertNotNull("getIdentifier returned null", id);
619         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
620     }
621
622     @Test
623     public void testClose() throws Exception{
624         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
625
626         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
627                 eq(actorSelection(actorRef)), eqSerializedReadData());
628
629         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
630
631         transactionProxy.read(TestModel.TEST_PATH);
632
633         transactionProxy.close();
634
635         verify(mockActorContext).sendOperationAsync(
636                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
637     }
638
639
640     /**
641      * Method to test a local Tx actor. The Tx paths are matched to decide if the
642      * Tx actor is local or not. This is done by mocking the Tx actor path
643      * and the caller paths and ensuring that the paths have the remote-address format
644      *
645      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
646      * the paths returned for the actors for all the tests are not qualified remote paths.
647      * Hence are treated as non-local/remote actors. In short, all tests except
648      * few below run for remote actors
649      *
650      * @throws Exception
651      */
652     @Test
653     public void testLocalTxActorRead() throws Exception {
654         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
655         doReturn(true).when(mockActorContext).isPathLocal(anyString());
656
657         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
658
659         // negative test case with null as the reply
660         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
661             any(ActorSelection.class), eqReadData());
662
663         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
664             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
665
666         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
667
668         // test case with node as read data reply
669         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
670
671         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
672             any(ActorSelection.class), eqReadData());
673
674         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
675
676         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
677
678         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
679
680         // test for local data exists
681         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
682             any(ActorSelection.class), eqDataExists());
683
684         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
685
686         assertEquals("Exists response", true, exists);
687     }
688
689     @Test
690     public void testLocalTxActorReady() throws Exception {
691         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
692         doReturn(true).when(mockActorContext).isPathLocal(anyString());
693
694         doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
695                 any(ActorSelection.class), isA(BatchedModifications.class));
696
697         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
698
699         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
700         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
701
702         // testing ready
703         doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
704             eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
705
706         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
707
708         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
709
710         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
711
712         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
713     }
714
715     private static interface TransactionProxyOperation {
716         void run(TransactionProxy transactionProxy);
717     }
718
719     private void throttleOperation(TransactionProxyOperation operation) {
720         throttleOperation(operation, 1, true);
721     }
722
723     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
724         ActorSystem actorSystem = getSystem();
725         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
726
727         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
728
729         doReturn(actorSystem.actorSelection(shardActorRef.path())).
730                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
731
732         if(shardFound) {
733             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
734                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
735         } else {
736             doReturn(Futures.failed(new Exception("not found")))
737                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
738         }
739
740         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
741         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
742                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
743                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
744
745         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
746                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
747                         eqCreateTransaction(memberName, READ_WRITE));
748
749         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
750
751         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
752
753         long start = System.nanoTime();
754
755         operation.run(transactionProxy);
756
757         long end = System.nanoTime();
758
759         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
760         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
761                 expected, (end-start)), (end - start) > expected);
762
763     }
764
765     private void completeOperation(TransactionProxyOperation operation){
766         completeOperation(operation, true);
767     }
768
769     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
770         ActorSystem actorSystem = getSystem();
771         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
772
773         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
774
775         doReturn(actorSystem.actorSelection(shardActorRef.path())).
776                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
777
778         if(shardFound) {
779             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
780                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
781         } else {
782             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
783                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
784         }
785
786         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
787         String actorPath = txActorRef.path().toString();
788         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
789                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
790                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
791
792         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
793
794         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
795                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
796                         eqCreateTransaction(memberName, READ_WRITE));
797
798         doReturn(true).when(mockActorContext).isPathLocal(anyString());
799
800         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
801
802         long start = System.nanoTime();
803
804         operation.run(transactionProxy);
805
806         long end = System.nanoTime();
807
808         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
809         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
810                 expected, (end-start)), (end - start) <= expected);
811     }
812
813     public void testWriteThrottling(boolean shardFound){
814
815         throttleOperation(new TransactionProxyOperation() {
816             @Override
817             public void run(TransactionProxy transactionProxy) {
818                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
819
820                 expectBatchedModifications(2);
821
822                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
823
824                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
825             }
826         }, 1, shardFound);
827     }
828
829     @Test
830     public void testWriteThrottlingWhenShardFound(){
831         dataStoreContextBuilder.shardBatchedModificationCount(1);
832         throttleOperation(new TransactionProxyOperation() {
833             @Override
834             public void run(TransactionProxy transactionProxy) {
835                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
836
837                 expectIncompleteBatchedModifications();
838
839                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
840
841                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
842             }
843         });
844     }
845
846     @Test
847     public void testWriteThrottlingWhenShardNotFound(){
848         // Confirm that there is no throttling when the Shard is not found
849         dataStoreContextBuilder.shardBatchedModificationCount(1);
850         completeOperation(new TransactionProxyOperation() {
851             @Override
852             public void run(TransactionProxy transactionProxy) {
853                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
854
855                 expectBatchedModifications(2);
856
857                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
858
859                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
860             }
861         }, false);
862
863     }
864
865
866     @Test
867     public void testWriteCompletion(){
868         dataStoreContextBuilder.shardBatchedModificationCount(1);
869         completeOperation(new TransactionProxyOperation() {
870             @Override
871             public void run(TransactionProxy transactionProxy) {
872                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
873
874                 expectBatchedModifications(2);
875
876                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
877
878                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
879             }
880         });
881     }
882
883     @Test
884     public void testMergeThrottlingWhenShardFound(){
885         dataStoreContextBuilder.shardBatchedModificationCount(1);
886         throttleOperation(new TransactionProxyOperation() {
887             @Override
888             public void run(TransactionProxy transactionProxy) {
889                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
890
891                 expectIncompleteBatchedModifications();
892
893                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
894
895                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
896             }
897         });
898     }
899
900     @Test
901     public void testMergeThrottlingWhenShardNotFound(){
902         dataStoreContextBuilder.shardBatchedModificationCount(1);
903         completeOperation(new TransactionProxyOperation() {
904             @Override
905             public void run(TransactionProxy transactionProxy) {
906                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
907
908                 expectBatchedModifications(2);
909
910                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
911
912                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
913             }
914         }, false);
915     }
916
917     @Test
918     public void testMergeCompletion(){
919         dataStoreContextBuilder.shardBatchedModificationCount(1);
920         completeOperation(new TransactionProxyOperation() {
921             @Override
922             public void run(TransactionProxy transactionProxy) {
923                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
924
925                 expectBatchedModifications(2);
926
927                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
928
929                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
930             }
931         });
932
933     }
934
935     @Test
936     public void testDeleteThrottlingWhenShardFound(){
937
938         throttleOperation(new TransactionProxyOperation() {
939             @Override
940             public void run(TransactionProxy transactionProxy) {
941                 expectIncompleteBatchedModifications();
942
943                 transactionProxy.delete(TestModel.TEST_PATH);
944
945                 transactionProxy.delete(TestModel.TEST_PATH);
946             }
947         });
948     }
949
950
951     @Test
952     public void testDeleteThrottlingWhenShardNotFound(){
953
954         completeOperation(new TransactionProxyOperation() {
955             @Override
956             public void run(TransactionProxy transactionProxy) {
957                 expectBatchedModifications(2);
958
959                 transactionProxy.delete(TestModel.TEST_PATH);
960
961                 transactionProxy.delete(TestModel.TEST_PATH);
962             }
963         }, false);
964     }
965
966     @Test
967     public void testDeleteCompletion(){
968         dataStoreContextBuilder.shardBatchedModificationCount(1);
969         completeOperation(new TransactionProxyOperation() {
970             @Override
971             public void run(TransactionProxy transactionProxy) {
972                 expectBatchedModifications(2);
973
974                 transactionProxy.delete(TestModel.TEST_PATH);
975
976                 transactionProxy.delete(TestModel.TEST_PATH);
977             }
978         });
979
980     }
981
982     @Test
983     public void testReadThrottlingWhenShardFound(){
984
985         throttleOperation(new TransactionProxyOperation() {
986             @Override
987             public void run(TransactionProxy transactionProxy) {
988                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
989                         any(ActorSelection.class), eqReadData());
990
991                 transactionProxy.read(TestModel.TEST_PATH);
992
993                 transactionProxy.read(TestModel.TEST_PATH);
994             }
995         });
996     }
997
998     @Test
999     public void testReadThrottlingWhenShardNotFound(){
1000
1001         completeOperation(new TransactionProxyOperation() {
1002             @Override
1003             public void run(TransactionProxy transactionProxy) {
1004                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1005                         any(ActorSelection.class), eqReadData());
1006
1007                 transactionProxy.read(TestModel.TEST_PATH);
1008
1009                 transactionProxy.read(TestModel.TEST_PATH);
1010             }
1011         }, false);
1012     }
1013
1014
1015     @Test
1016     public void testReadCompletion(){
1017         completeOperation(new TransactionProxyOperation() {
1018             @Override
1019             public void run(TransactionProxy transactionProxy) {
1020                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1021
1022                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1023                         any(ActorSelection.class), eqReadData());
1024
1025                 transactionProxy.read(TestModel.TEST_PATH);
1026
1027                 transactionProxy.read(TestModel.TEST_PATH);
1028             }
1029         });
1030
1031     }
1032
1033     @Test
1034     public void testExistsThrottlingWhenShardFound(){
1035
1036         throttleOperation(new TransactionProxyOperation() {
1037             @Override
1038             public void run(TransactionProxy transactionProxy) {
1039                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1040                         any(ActorSelection.class), eqDataExists());
1041
1042                 transactionProxy.exists(TestModel.TEST_PATH);
1043
1044                 transactionProxy.exists(TestModel.TEST_PATH);
1045             }
1046         });
1047     }
1048
1049     @Test
1050     public void testExistsThrottlingWhenShardNotFound(){
1051
1052         completeOperation(new TransactionProxyOperation() {
1053             @Override
1054             public void run(TransactionProxy transactionProxy) {
1055                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1056                         any(ActorSelection.class), eqDataExists());
1057
1058                 transactionProxy.exists(TestModel.TEST_PATH);
1059
1060                 transactionProxy.exists(TestModel.TEST_PATH);
1061             }
1062         }, false);
1063     }
1064
1065
1066     @Test
1067     public void testExistsCompletion(){
1068         completeOperation(new TransactionProxyOperation() {
1069             @Override
1070             public void run(TransactionProxy transactionProxy) {
1071                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1072                         any(ActorSelection.class), eqDataExists());
1073
1074                 transactionProxy.exists(TestModel.TEST_PATH);
1075
1076                 transactionProxy.exists(TestModel.TEST_PATH);
1077             }
1078         });
1079
1080     }
1081
1082     @Test
1083     public void testReadyThrottling(){
1084
1085         throttleOperation(new TransactionProxyOperation() {
1086             @Override
1087             public void run(TransactionProxy transactionProxy) {
1088                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1089
1090                 expectBatchedModifications(1);
1091
1092                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1093                         any(ActorSelection.class), any(ReadyTransaction.class));
1094
1095                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1096
1097                 transactionProxy.ready();
1098             }
1099         });
1100     }
1101
1102     @Test
1103     public void testReadyThrottlingWithTwoTransactionContexts(){
1104
1105         throttleOperation(new TransactionProxyOperation() {
1106             @Override
1107             public void run(TransactionProxy transactionProxy) {
1108                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1109                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1110
1111                 expectBatchedModifications(2);
1112
1113                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1114                         any(ActorSelection.class), any(ReadyTransaction.class));
1115
1116                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1117
1118                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1119
1120                 transactionProxy.ready();
1121             }
1122         }, 2, true);
1123     }
1124
1125     private void testModificationOperationBatching(TransactionType type) throws Exception {
1126         int shardBatchedModificationCount = 3;
1127         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1128
1129         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1130
1131         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1132
1133         expectReadyTransaction(actorRef);
1134
1135         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1136         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1137
1138         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1139         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1140
1141         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1142         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1143
1144         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1145         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1146
1147         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1148         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1149
1150         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1151         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1152
1153         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1154         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1155
1156         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1157
1158         transactionProxy.write(writePath1, writeNode1);
1159         transactionProxy.write(writePath2, writeNode2);
1160         transactionProxy.delete(deletePath1);
1161         transactionProxy.merge(mergePath1, mergeNode1);
1162         transactionProxy.merge(mergePath2, mergeNode2);
1163         transactionProxy.write(writePath3, writeNode3);
1164         transactionProxy.merge(mergePath3, mergeNode3);
1165         transactionProxy.delete(deletePath2);
1166
1167         // This sends the last batch.
1168         transactionProxy.ready();
1169
1170         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1171         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1172
1173         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1174                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1175
1176         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1177                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1178
1179         boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
1180         verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
1181                 new DeleteModification(deletePath2));
1182     }
1183
1184     @Test
1185     public void testReadWriteModificationOperationBatching() throws Throwable {
1186         testModificationOperationBatching(READ_WRITE);
1187     }
1188
1189     @Test
1190     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1191         testModificationOperationBatching(WRITE_ONLY);
1192     }
1193
1194     @Test
1195     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1196         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1197         testModificationOperationBatching(WRITE_ONLY);
1198     }
1199
1200     @Test
1201     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1202
1203         int shardBatchedModificationCount = 10;
1204         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1205
1206         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1207
1208         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1209
1210         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1211         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1212
1213         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1214         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1215
1216         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1217         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1218
1219         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1220         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1221
1222         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1223
1224         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1225                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1226
1227         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1228                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1229
1230         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1231                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1232
1233         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1234
1235         transactionProxy.write(writePath1, writeNode1);
1236         transactionProxy.write(writePath2, writeNode2);
1237
1238         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1239                 get(5, TimeUnit.SECONDS);
1240
1241         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1242         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1243
1244         transactionProxy.merge(mergePath1, mergeNode1);
1245         transactionProxy.merge(mergePath2, mergeNode2);
1246
1247         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1248
1249         transactionProxy.delete(deletePath);
1250
1251         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1252         assertEquals("Exists response", true, exists);
1253
1254         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1255         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1256
1257         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1258         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1259
1260         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1261                 new WriteModification(writePath2, writeNode2));
1262
1263         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1264                 new MergeModification(mergePath2, mergeNode2));
1265
1266         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1267
1268         InOrder inOrder = Mockito.inOrder(mockActorContext);
1269         inOrder.verify(mockActorContext).executeOperationAsync(
1270                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1271
1272         inOrder.verify(mockActorContext).executeOperationAsync(
1273                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1274
1275         inOrder.verify(mockActorContext).executeOperationAsync(
1276                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1277
1278         inOrder.verify(mockActorContext).executeOperationAsync(
1279                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1280
1281         inOrder.verify(mockActorContext).executeOperationAsync(
1282                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1283
1284         inOrder.verify(mockActorContext).executeOperationAsync(
1285                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1286     }
1287
1288     @Test
1289     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1290
1291         SchemaContext schemaContext = SchemaContextHelper.full();
1292         Configuration configuration = mock(Configuration.class);
1293         doReturn(configuration).when(mockActorContext).getConfiguration();
1294         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1295         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1296
1297         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1298         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1299
1300         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1301         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1302
1303         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1304
1305         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
1306
1307         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1308
1309         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
1310
1311         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1312                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1313
1314         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1315
1316         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1317
1318         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1319
1320         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1321
1322         for(NormalizedNode<?,?> node : collection){
1323             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1324         }
1325
1326         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1327                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1328
1329         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1330
1331         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1332                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1333
1334         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1335     }
1336
1337
1338     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1339         ActorSystem actorSystem = getSystem();
1340         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1341
1342         doReturn(getSystem().actorSelection(shardActorRef.path())).
1343                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1344
1345         doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
1346                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1347
1348         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1349
1350         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1351
1352         doReturn(actorSystem.actorSelection(txActorRef.path())).
1353                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1354
1355         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1356                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1357                         eqCreateTransaction(memberName, TransactionType.READ_ONLY));
1358
1359         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1360                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1361     }
1362 }