Use BatchedModifications message in place of ReadyTransaction message
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.anyString;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Matchers.isA;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
43 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
45 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
46 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
47 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
48 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
49 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
50 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
51 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
53 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
54 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
55 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import scala.concurrent.Promise;
63
64 @SuppressWarnings("resource")
65 public class TransactionProxyTest extends AbstractTransactionProxyTest {
66
67     @SuppressWarnings("serial")
68     static class TestException extends RuntimeException {
69     }
70
71     static interface Invoker {
72         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
73     }
74
75     @Test
76     public void testRead() throws Exception {
77         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
78
79         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
80
81         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
82                 eq(actorSelection(actorRef)), eqSerializedReadData());
83
84         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
85                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
86
87         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
88
89         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
90
91         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
92                 eq(actorSelection(actorRef)), eqSerializedReadData());
93
94         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
95
96         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
97
98         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
99     }
100
101     @Test(expected = ReadFailedException.class)
102     public void testReadWithInvalidReplyMessageType() throws Exception {
103         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
104
105         doReturn(Futures.successful(new Object())).when(mockActorContext).
106                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
107
108         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
109
110         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
111     }
112
113     @Test(expected = TestException.class)
114     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
115         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
116
117         doReturn(Futures.failed(new TestException())).when(mockActorContext).
118                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
119
120         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
121
122         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
123     }
124
125     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
126             throws Throwable {
127         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
128
129         if (exToThrow instanceof PrimaryNotFoundException) {
130             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
131         } else {
132             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
133                     when(mockActorContext).findPrimaryShardAsync(anyString());
134         }
135
136         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
137                 any(ActorSelection.class), any());
138
139         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
140
141         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
142     }
143
144     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
145         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
146             @Override
147             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
148                 return proxy.read(TestModel.TEST_PATH);
149             }
150         });
151     }
152
153     @Test(expected = PrimaryNotFoundException.class)
154     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
155         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
156     }
157
158     @Test(expected = TimeoutException.class)
159     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
160         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
161                 new Exception("reason")));
162     }
163
164     @Test(expected = TestException.class)
165     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
166         testReadWithExceptionOnInitialCreateTransaction(new TestException());
167     }
168
169     @Test
170     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
171         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
172
173         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
174
175         expectBatchedModifications(actorRef, 1);
176
177         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
178                 eq(actorSelection(actorRef)), eqSerializedReadData());
179
180         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
181
182         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
183
184         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
185                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
186
187         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
188         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
189
190         InOrder inOrder = Mockito.inOrder(mockActorContext);
191         inOrder.verify(mockActorContext).executeOperationAsync(
192                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
193
194         inOrder.verify(mockActorContext).executeOperationAsync(
195                 eq(actorSelection(actorRef)), eqSerializedReadData());
196     }
197
198     @Test(expected=IllegalStateException.class)
199     public void testReadPreConditionCheck() {
200         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
201         transactionProxy.read(TestModel.TEST_PATH);
202     }
203
204     @Test(expected=IllegalArgumentException.class)
205     public void testInvalidCreateTransactionReply() throws Throwable {
206         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
207
208         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
209             actorSelection(actorRef.path().toString());
210
211         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
212             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
213
214         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
215             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
216
217         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
218
219         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
220     }
221
222     @Test
223     public void testExists() throws Exception {
224         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
225
226         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
227
228         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
229                 eq(actorSelection(actorRef)), eqSerializedDataExists());
230
231         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
232
233         assertEquals("Exists response", false, exists);
234
235         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
236                 eq(actorSelection(actorRef)), eqSerializedDataExists());
237
238         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
239
240         assertEquals("Exists response", true, exists);
241     }
242
243     @Test(expected = PrimaryNotFoundException.class)
244     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
245         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
246             @Override
247             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
248                 return proxy.exists(TestModel.TEST_PATH);
249             }
250         });
251     }
252
253     @Test(expected = ReadFailedException.class)
254     public void testExistsWithInvalidReplyMessageType() throws Exception {
255         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
256
257         doReturn(Futures.successful(new Object())).when(mockActorContext).
258                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
259
260         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
261                 READ_ONLY);
262
263         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
264     }
265
266     @Test(expected = TestException.class)
267     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
268         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
269
270         doReturn(Futures.failed(new TestException())).when(mockActorContext).
271                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
272
273         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
274
275         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
276     }
277
278     @Test
279     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
280         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
281
282         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
283
284         expectBatchedModifications(actorRef, 1);
285
286         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
287                 eq(actorSelection(actorRef)), eqSerializedDataExists());
288
289         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
290
291         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
292
293         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
294
295         assertEquals("Exists response", true, exists);
296
297         InOrder inOrder = Mockito.inOrder(mockActorContext);
298         inOrder.verify(mockActorContext).executeOperationAsync(
299                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
300
301         inOrder.verify(mockActorContext).executeOperationAsync(
302                 eq(actorSelection(actorRef)), eqSerializedDataExists());
303     }
304
305     @Test(expected=IllegalStateException.class)
306     public void testExistsPreConditionCheck() {
307         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
308         transactionProxy.exists(TestModel.TEST_PATH);
309     }
310
311     @Test
312     public void testWrite() throws Exception {
313         dataStoreContextBuilder.shardBatchedModificationCount(1);
314         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
315
316         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
317
318         expectBatchedModifications(actorRef, 1);
319
320         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
321
322         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
323
324         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
325     }
326
327     @Test
328     public void testWriteAfterAsyncRead() throws Throwable {
329         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
330
331         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
332         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
333                 eq(getSystem().actorSelection(actorRef.path())),
334                 eqCreateTransaction(memberName, READ_WRITE));
335
336         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
337                 eq(actorSelection(actorRef)), eqSerializedReadData());
338
339         expectBatchedModificationsReady(actorRef);
340
341         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
342
343         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
344
345         final CountDownLatch readComplete = new CountDownLatch(1);
346         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
347         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
348                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
349                     @Override
350                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
351                         try {
352                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
353                         } catch (Exception e) {
354                             caughtEx.set(e);
355                         } finally {
356                             readComplete.countDown();
357                         }
358                     }
359
360                     @Override
361                     public void onFailure(Throwable t) {
362                         caughtEx.set(t);
363                         readComplete.countDown();
364                     }
365                 });
366
367         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
368
369         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
370
371         if(caughtEx.get() != null) {
372             throw caughtEx.get();
373         }
374
375         // This sends the batched modification.
376         transactionProxy.ready();
377
378         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
379     }
380
381     @Test(expected=IllegalStateException.class)
382     public void testWritePreConditionCheck() {
383         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
384         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
385     }
386
387     @Test(expected=IllegalStateException.class)
388     public void testWriteAfterReadyPreConditionCheck() {
389         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
390
391         transactionProxy.ready();
392
393         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
394     }
395
396     @Test
397     public void testMerge() throws Exception {
398         dataStoreContextBuilder.shardBatchedModificationCount(1);
399         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
400
401         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
402
403         expectBatchedModifications(actorRef, 1);
404
405         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
406
407         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
408
409         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
410     }
411
412     @Test
413     public void testDelete() throws Exception {
414         dataStoreContextBuilder.shardBatchedModificationCount(1);
415         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
416
417         expectBatchedModifications(actorRef, 1);
418
419         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
420
421         transactionProxy.delete(TestModel.TEST_PATH);
422
423         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
424     }
425
426     @Test
427     public void testReadWrite() throws Exception {
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
429
430         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
431
432         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
433                 eq(actorSelection(actorRef)), eqSerializedReadData());
434
435         expectBatchedModifications(actorRef, 1);
436
437         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
438
439         transactionProxy.read(TestModel.TEST_PATH);
440
441         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
442
443         transactionProxy.read(TestModel.TEST_PATH);
444
445         transactionProxy.read(TestModel.TEST_PATH);
446
447         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
448         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
449
450         verifyBatchedModifications(batchedModifications.get(0), false,
451                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
452     }
453
454     @Test
455     public void testReadyWithReadWrite() throws Exception {
456         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
457
458         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
459
460         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
461                 eq(actorSelection(actorRef)), eqSerializedReadData());
462
463         expectBatchedModificationsReady(actorRef);
464
465         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
466
467         transactionProxy.read(TestModel.TEST_PATH);
468
469         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
470
471         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
472
473         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
474
475         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
476
477         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
478
479         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
480                 isA(BatchedModifications.class));
481     }
482
483     @Test
484     public void testReadyWithNoModifications() throws Exception {
485         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
486
487         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
488                 eq(actorSelection(actorRef)), eqSerializedReadData());
489
490         expectBatchedModificationsReady(actorRef);
491
492         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
493
494         transactionProxy.read(TestModel.TEST_PATH);
495
496         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
497
498         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
499
500         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
501
502         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
503
504         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
505         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
506
507         verifyBatchedModifications(batchedModifications.get(0), true);
508     }
509
510     @Test
511     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
512         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
513
514         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
515
516         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
517
518         expectBatchedModificationsReady(actorRef);
519
520         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
521
522         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
523
524         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
525
526         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
527
528         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
529
530         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
531
532         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
533         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
534
535         verifyBatchedModifications(batchedModifications.get(0), true,
536                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
537
538         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
539                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
540     }
541
542     @Test
543     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
544         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
545         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
546
547         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
548
549         expectBatchedModificationsReady(actorRef);
550
551         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
552
553         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
554
555         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
556
557         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
558
559         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
560
561         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
562
563         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
564         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
565
566         verifyBatchedModifications(batchedModifications.get(0), false,
567                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
568
569         verifyBatchedModifications(batchedModifications.get(1), true);
570
571         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
572                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
573     }
574
575     @Test
576     public void testReadyWithReplyFailure() throws Exception {
577         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
578
579         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
580
581         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
582
583         expectFailedBatchedModifications(actorRef);
584
585         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
586
587         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
588
589         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
590
591         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
592
593         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
594
595         verifyCohortFutures(proxy, TestException.class);
596     }
597
598     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
599         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
600
601         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
602
603         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
604
605         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
606
607         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
608
609         transactionProxy.delete(TestModel.TEST_PATH);
610
611         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
612
613         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
614
615         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
616
617         verifyCohortFutures(proxy, toThrow.getClass());
618     }
619
620     @Test
621     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
622         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
623     }
624
625     @Test
626     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
627         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
628     }
629
630     @Test
631     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
632         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
633     }
634
635     @Test
636     public void testReadyWithInvalidReplyMessageType() throws Exception {
637         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
638         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
639
640         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
641
642         //expectBatchedModifications(actorRef, 1);
643
644         doReturn(Futures.successful(new Object())).when(mockActorContext).
645                 executeOperationAsync(eq(actorSelection(actorRef)),
646                         isA(BatchedModifications.class));
647
648         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
649
650         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
651
652         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
653
654         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
655
656         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
657
658         verifyCohortFutures(proxy, IllegalArgumentException.class);
659     }
660
661     @Test
662     public void testGetIdentifier() {
663         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
664         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
665                 TransactionProxy.TransactionType.READ_ONLY);
666
667         Object id = transactionProxy.getIdentifier();
668         assertNotNull("getIdentifier returned null", id);
669         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
670     }
671
672     @Test
673     public void testClose() throws Exception{
674         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
675
676         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
677                 eq(actorSelection(actorRef)), eqSerializedReadData());
678
679         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
680
681         transactionProxy.read(TestModel.TEST_PATH);
682
683         transactionProxy.close();
684
685         verify(mockActorContext).sendOperationAsync(
686                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
687     }
688
689
690     /**
691      * Method to test a local Tx actor. The Tx paths are matched to decide if the
692      * Tx actor is local or not. This is done by mocking the Tx actor path
693      * and the caller paths and ensuring that the paths have the remote-address format
694      *
695      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
696      * the paths returned for the actors for all the tests are not qualified remote paths.
697      * Hence are treated as non-local/remote actors. In short, all tests except
698      * few below run for remote actors
699      *
700      * @throws Exception
701      */
702     @Test
703     public void testLocalTxActorRead() throws Exception {
704         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
705         doReturn(true).when(mockActorContext).isPathLocal(anyString());
706
707         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
708
709         // negative test case with null as the reply
710         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
711             any(ActorSelection.class), eqReadData());
712
713         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
714             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
715
716         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
717
718         // test case with node as read data reply
719         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
720
721         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
722             any(ActorSelection.class), eqReadData());
723
724         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
725
726         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
727
728         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
729
730         // test for local data exists
731         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
732             any(ActorSelection.class), eqDataExists());
733
734         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
735
736         assertEquals("Exists response", true, exists);
737     }
738
739     @Test
740     public void testLocalTxActorReady() throws Exception {
741         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
742         doReturn(true).when(mockActorContext).isPathLocal(anyString());
743
744         expectBatchedModificationsReady(actorRef);
745
746         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
747
748         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
749         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
750
751         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
752
753         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
754
755         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
756
757         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
758     }
759
760     private static interface TransactionProxyOperation {
761         void run(TransactionProxy transactionProxy);
762     }
763
764     private void throttleOperation(TransactionProxyOperation operation) {
765         throttleOperation(operation, 1, true);
766     }
767
768     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
769         ActorSystem actorSystem = getSystem();
770         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
771
772         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
773
774         doReturn(actorSystem.actorSelection(shardActorRef.path())).
775                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
776
777         if(shardFound) {
778             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
779                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
780         } else {
781             doReturn(Futures.failed(new Exception("not found")))
782                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
783         }
784
785         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
786         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
787                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
788                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
789
790         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
791                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
792                         eqCreateTransaction(memberName, READ_WRITE));
793
794         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
795
796         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
797
798         long start = System.nanoTime();
799
800         operation.run(transactionProxy);
801
802         long end = System.nanoTime();
803
804         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
805         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
806                 expected, (end-start)), (end - start) > expected);
807
808     }
809
810     private void completeOperation(TransactionProxyOperation operation){
811         completeOperation(operation, true);
812     }
813
814     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
815         ActorSystem actorSystem = getSystem();
816         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
817
818         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
819
820         doReturn(actorSystem.actorSelection(shardActorRef.path())).
821                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
822
823         if(shardFound) {
824             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
825                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
826         } else {
827             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
828                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
829         }
830
831         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
832         String actorPath = txActorRef.path().toString();
833         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
834                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
835                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
836
837         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
838
839         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
840                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
841                         eqCreateTransaction(memberName, READ_WRITE));
842
843         doReturn(true).when(mockActorContext).isPathLocal(anyString());
844
845         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
846
847         long start = System.nanoTime();
848
849         operation.run(transactionProxy);
850
851         long end = System.nanoTime();
852
853         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
854         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
855                 expected, (end-start)), (end - start) <= expected);
856     }
857
858     public void testWriteThrottling(boolean shardFound){
859
860         throttleOperation(new TransactionProxyOperation() {
861             @Override
862             public void run(TransactionProxy transactionProxy) {
863                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
864
865                 expectBatchedModifications(2);
866
867                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
868
869                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
870             }
871         }, 1, shardFound);
872     }
873
874     @Test
875     public void testWriteThrottlingWhenShardFound(){
876         dataStoreContextBuilder.shardBatchedModificationCount(1);
877         throttleOperation(new TransactionProxyOperation() {
878             @Override
879             public void run(TransactionProxy transactionProxy) {
880                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
881
882                 expectIncompleteBatchedModifications();
883
884                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
885
886                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
887             }
888         });
889     }
890
891     @Test
892     public void testWriteThrottlingWhenShardNotFound(){
893         // Confirm that there is no throttling when the Shard is not found
894         dataStoreContextBuilder.shardBatchedModificationCount(1);
895         completeOperation(new TransactionProxyOperation() {
896             @Override
897             public void run(TransactionProxy transactionProxy) {
898                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
899
900                 expectBatchedModifications(2);
901
902                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
903
904                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
905             }
906         }, false);
907
908     }
909
910
911     @Test
912     public void testWriteCompletion(){
913         dataStoreContextBuilder.shardBatchedModificationCount(1);
914         completeOperation(new TransactionProxyOperation() {
915             @Override
916             public void run(TransactionProxy transactionProxy) {
917                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
918
919                 expectBatchedModifications(2);
920
921                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
922
923                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
924             }
925         });
926     }
927
928     @Test
929     public void testMergeThrottlingWhenShardFound(){
930         dataStoreContextBuilder.shardBatchedModificationCount(1);
931         throttleOperation(new TransactionProxyOperation() {
932             @Override
933             public void run(TransactionProxy transactionProxy) {
934                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
935
936                 expectIncompleteBatchedModifications();
937
938                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
939
940                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
941             }
942         });
943     }
944
945     @Test
946     public void testMergeThrottlingWhenShardNotFound(){
947         dataStoreContextBuilder.shardBatchedModificationCount(1);
948         completeOperation(new TransactionProxyOperation() {
949             @Override
950             public void run(TransactionProxy transactionProxy) {
951                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
952
953                 expectBatchedModifications(2);
954
955                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
956
957                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
958             }
959         }, false);
960     }
961
962     @Test
963     public void testMergeCompletion(){
964         dataStoreContextBuilder.shardBatchedModificationCount(1);
965         completeOperation(new TransactionProxyOperation() {
966             @Override
967             public void run(TransactionProxy transactionProxy) {
968                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
969
970                 expectBatchedModifications(2);
971
972                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
973
974                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
975             }
976         });
977
978     }
979
980     @Test
981     public void testDeleteThrottlingWhenShardFound(){
982
983         throttleOperation(new TransactionProxyOperation() {
984             @Override
985             public void run(TransactionProxy transactionProxy) {
986                 expectIncompleteBatchedModifications();
987
988                 transactionProxy.delete(TestModel.TEST_PATH);
989
990                 transactionProxy.delete(TestModel.TEST_PATH);
991             }
992         });
993     }
994
995
996     @Test
997     public void testDeleteThrottlingWhenShardNotFound(){
998
999         completeOperation(new TransactionProxyOperation() {
1000             @Override
1001             public void run(TransactionProxy transactionProxy) {
1002                 expectBatchedModifications(2);
1003
1004                 transactionProxy.delete(TestModel.TEST_PATH);
1005
1006                 transactionProxy.delete(TestModel.TEST_PATH);
1007             }
1008         }, false);
1009     }
1010
1011     @Test
1012     public void testDeleteCompletion(){
1013         dataStoreContextBuilder.shardBatchedModificationCount(1);
1014         completeOperation(new TransactionProxyOperation() {
1015             @Override
1016             public void run(TransactionProxy transactionProxy) {
1017                 expectBatchedModifications(2);
1018
1019                 transactionProxy.delete(TestModel.TEST_PATH);
1020
1021                 transactionProxy.delete(TestModel.TEST_PATH);
1022             }
1023         });
1024
1025     }
1026
1027     @Test
1028     public void testReadThrottlingWhenShardFound(){
1029
1030         throttleOperation(new TransactionProxyOperation() {
1031             @Override
1032             public void run(TransactionProxy transactionProxy) {
1033                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1034                         any(ActorSelection.class), eqReadData());
1035
1036                 transactionProxy.read(TestModel.TEST_PATH);
1037
1038                 transactionProxy.read(TestModel.TEST_PATH);
1039             }
1040         });
1041     }
1042
1043     @Test
1044     public void testReadThrottlingWhenShardNotFound(){
1045
1046         completeOperation(new TransactionProxyOperation() {
1047             @Override
1048             public void run(TransactionProxy transactionProxy) {
1049                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1050                         any(ActorSelection.class), eqReadData());
1051
1052                 transactionProxy.read(TestModel.TEST_PATH);
1053
1054                 transactionProxy.read(TestModel.TEST_PATH);
1055             }
1056         }, false);
1057     }
1058
1059
1060     @Test
1061     public void testReadCompletion(){
1062         completeOperation(new TransactionProxyOperation() {
1063             @Override
1064             public void run(TransactionProxy transactionProxy) {
1065                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1066
1067                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1068                         any(ActorSelection.class), eqReadData());
1069
1070                 transactionProxy.read(TestModel.TEST_PATH);
1071
1072                 transactionProxy.read(TestModel.TEST_PATH);
1073             }
1074         });
1075
1076     }
1077
1078     @Test
1079     public void testExistsThrottlingWhenShardFound(){
1080
1081         throttleOperation(new TransactionProxyOperation() {
1082             @Override
1083             public void run(TransactionProxy transactionProxy) {
1084                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1085                         any(ActorSelection.class), eqDataExists());
1086
1087                 transactionProxy.exists(TestModel.TEST_PATH);
1088
1089                 transactionProxy.exists(TestModel.TEST_PATH);
1090             }
1091         });
1092     }
1093
1094     @Test
1095     public void testExistsThrottlingWhenShardNotFound(){
1096
1097         completeOperation(new TransactionProxyOperation() {
1098             @Override
1099             public void run(TransactionProxy transactionProxy) {
1100                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1101                         any(ActorSelection.class), eqDataExists());
1102
1103                 transactionProxy.exists(TestModel.TEST_PATH);
1104
1105                 transactionProxy.exists(TestModel.TEST_PATH);
1106             }
1107         }, false);
1108     }
1109
1110
1111     @Test
1112     public void testExistsCompletion(){
1113         completeOperation(new TransactionProxyOperation() {
1114             @Override
1115             public void run(TransactionProxy transactionProxy) {
1116                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1117                         any(ActorSelection.class), eqDataExists());
1118
1119                 transactionProxy.exists(TestModel.TEST_PATH);
1120
1121                 transactionProxy.exists(TestModel.TEST_PATH);
1122             }
1123         });
1124
1125     }
1126
1127     @Test
1128     public void testReadyThrottling(){
1129
1130         throttleOperation(new TransactionProxyOperation() {
1131             @Override
1132             public void run(TransactionProxy transactionProxy) {
1133                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1134
1135                 expectBatchedModifications(1);
1136
1137                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1138                         any(ActorSelection.class), any(ReadyTransaction.class));
1139
1140                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1141
1142                 transactionProxy.ready();
1143             }
1144         });
1145     }
1146
1147     @Test
1148     public void testReadyThrottlingWithTwoTransactionContexts(){
1149
1150         throttleOperation(new TransactionProxyOperation() {
1151             @Override
1152             public void run(TransactionProxy transactionProxy) {
1153                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1154                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1155
1156                 expectBatchedModifications(2);
1157
1158                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1159                         any(ActorSelection.class), any(ReadyTransaction.class));
1160
1161                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1162
1163                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1164
1165                 transactionProxy.ready();
1166             }
1167         }, 2, true);
1168     }
1169
1170     private void testModificationOperationBatching(TransactionType type) throws Exception {
1171         int shardBatchedModificationCount = 3;
1172         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1173
1174         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1175
1176         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1177
1178         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1179         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1180
1181         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1182         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1183
1184         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1185         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1186
1187         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1188         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1189
1190         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1191         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1192
1193         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1194         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1195
1196         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1197         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1198
1199         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1200
1201         transactionProxy.write(writePath1, writeNode1);
1202         transactionProxy.write(writePath2, writeNode2);
1203         transactionProxy.delete(deletePath1);
1204         transactionProxy.merge(mergePath1, mergeNode1);
1205         transactionProxy.merge(mergePath2, mergeNode2);
1206         transactionProxy.write(writePath3, writeNode3);
1207         transactionProxy.merge(mergePath3, mergeNode3);
1208         transactionProxy.delete(deletePath2);
1209
1210         // This sends the last batch.
1211         transactionProxy.ready();
1212
1213         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1214         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1215
1216         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1217                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1218
1219         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1220                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1221
1222         verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
1223                 new DeleteModification(deletePath2));
1224     }
1225
1226     @Test
1227     public void testReadWriteModificationOperationBatching() throws Throwable {
1228         testModificationOperationBatching(READ_WRITE);
1229     }
1230
1231     @Test
1232     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1233         testModificationOperationBatching(WRITE_ONLY);
1234     }
1235
1236     @Test
1237     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1238         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1239         testModificationOperationBatching(WRITE_ONLY);
1240     }
1241
1242     @Test
1243     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1244
1245         int shardBatchedModificationCount = 10;
1246         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1247
1248         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1249
1250         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1251
1252         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1253         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1254
1255         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1256         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1257
1258         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1259         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1260
1261         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1262         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1263
1264         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1265
1266         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1267                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1268
1269         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1270                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1271
1272         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1273                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1274
1275         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1276
1277         transactionProxy.write(writePath1, writeNode1);
1278         transactionProxy.write(writePath2, writeNode2);
1279
1280         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1281                 get(5, TimeUnit.SECONDS);
1282
1283         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1284         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1285
1286         transactionProxy.merge(mergePath1, mergeNode1);
1287         transactionProxy.merge(mergePath2, mergeNode2);
1288
1289         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1290
1291         transactionProxy.delete(deletePath);
1292
1293         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1294         assertEquals("Exists response", true, exists);
1295
1296         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1297         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1298
1299         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1300         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1301
1302         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1303                 new WriteModification(writePath2, writeNode2));
1304
1305         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1306                 new MergeModification(mergePath2, mergeNode2));
1307
1308         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1309
1310         InOrder inOrder = Mockito.inOrder(mockActorContext);
1311         inOrder.verify(mockActorContext).executeOperationAsync(
1312                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1313
1314         inOrder.verify(mockActorContext).executeOperationAsync(
1315                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1316
1317         inOrder.verify(mockActorContext).executeOperationAsync(
1318                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1319
1320         inOrder.verify(mockActorContext).executeOperationAsync(
1321                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1322
1323         inOrder.verify(mockActorContext).executeOperationAsync(
1324                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1325
1326         inOrder.verify(mockActorContext).executeOperationAsync(
1327                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1328     }
1329
1330     @Test
1331     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1332
1333         SchemaContext schemaContext = SchemaContextHelper.full();
1334         Configuration configuration = mock(Configuration.class);
1335         doReturn(configuration).when(mockActorContext).getConfiguration();
1336         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1337         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1338
1339         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1340         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1341
1342         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1343         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1344
1345         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1346
1347         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
1348
1349         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1350
1351         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
1352
1353         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1354                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1355
1356         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1357
1358         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1359
1360         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1361
1362         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1363
1364         for(NormalizedNode<?,?> node : collection){
1365             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1366         }
1367
1368         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1369                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1370
1371         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1372
1373         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1374                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1375
1376         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1377     }
1378
1379
1380     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1381         ActorSystem actorSystem = getSystem();
1382         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1383
1384         doReturn(getSystem().actorSelection(shardActorRef.path())).
1385                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1386
1387         doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
1388                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1389
1390         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1391
1392         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1393
1394         doReturn(actorSystem.actorSelection(txActorRef.path())).
1395                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1396
1397         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1398                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1399                         eqCreateTransaction(memberName, TransactionType.READ_ONLY));
1400
1401         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1402                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1403     }
1404 }