Do not use ActorSystem.actorFor as it is deprecated
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.isA;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.times;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.util.concurrent.CheckedFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.mockito.InOrder;
33 import org.mockito.Mockito;
34 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
36 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
37 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
44 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
45 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
48 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
53 import scala.concurrent.Await;
54 import scala.concurrent.Future;
55 import scala.concurrent.Promise;
56 import scala.concurrent.duration.Duration;
57
58 @SuppressWarnings("resource")
59 public class TransactionProxyTest extends AbstractTransactionProxyTest {
60
61     @SuppressWarnings("serial")
62     static class TestException extends RuntimeException {
63     }
64
65     static interface Invoker {
66         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
67     }
68
69     @Test
70     public void testRead() throws Exception {
71         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
72
73         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
74
75         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
76                 eq(actorSelection(actorRef)), eqSerializedReadData());
77
78         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
79                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
80
81         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
82
83         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
84
85         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
86                 eq(actorSelection(actorRef)), eqSerializedReadData());
87
88         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
89
90         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
91
92         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
93     }
94
95     @Test(expected = ReadFailedException.class)
96     public void testReadWithInvalidReplyMessageType() throws Exception {
97         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
98
99         doReturn(Futures.successful(new Object())).when(mockActorContext).
100                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
101
102         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
103
104         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
105     }
106
107     @Test(expected = TestException.class)
108     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
109         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
110
111         doReturn(Futures.failed(new TestException())).when(mockActorContext).
112                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
113
114         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
115
116         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
117     }
118
119     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
120             throws Throwable {
121         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
122
123         if (exToThrow instanceof PrimaryNotFoundException) {
124             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
125         } else {
126             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
127                     when(mockActorContext).findPrimaryShardAsync(anyString());
128         }
129
130         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
131                 any(ActorSelection.class), any());
132
133         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
134
135         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
136     }
137
138     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
139         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
140             @Override
141             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
142                 return proxy.read(TestModel.TEST_PATH);
143             }
144         });
145     }
146
147     @Test(expected = PrimaryNotFoundException.class)
148     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
149         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
150     }
151
152     @Test(expected = TimeoutException.class)
153     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
154         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
155                 new Exception("reason")));
156     }
157
158     @Test(expected = TestException.class)
159     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
160         testReadWithExceptionOnInitialCreateTransaction(new TestException());
161     }
162
163     @Test(expected = TestException.class)
164     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
165         doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
166                 when(mockActorContext).getDatastoreContext();
167
168         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
169
170         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
171
172         expectFailedBatchedModifications(actorRef);
173
174         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
175                 eq(actorSelection(actorRef)), eqSerializedReadData());
176
177         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
178
179         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
180
181         transactionProxy.delete(TestModel.TEST_PATH);
182
183         try {
184             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
185         } finally {
186             verify(mockActorContext, times(0)).executeOperationAsync(
187                     eq(actorSelection(actorRef)), eqSerializedReadData());
188         }
189     }
190
191     @Test
192     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
193         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
194
195         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
196
197         expectBatchedModifications(actorRef, 1);
198
199         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
200                 eq(actorSelection(actorRef)), eqSerializedReadData());
201
202         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
203
204         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
205
206         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
207                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
208
209         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
210         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
211
212         InOrder inOrder = Mockito.inOrder(mockActorContext);
213         inOrder.verify(mockActorContext).executeOperationAsync(
214                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
215
216         inOrder.verify(mockActorContext).executeOperationAsync(
217                 eq(actorSelection(actorRef)), eqSerializedReadData());
218     }
219
220     @Test(expected=IllegalStateException.class)
221     public void testReadPreConditionCheck() {
222         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
223         transactionProxy.read(TestModel.TEST_PATH);
224     }
225
226     @Test(expected=IllegalArgumentException.class)
227     public void testInvalidCreateTransactionReply() throws Throwable {
228         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
229
230         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
231             actorSelection(actorRef.path().toString());
232
233         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
234             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
235
236         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
237             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
238
239         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
240
241         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
242     }
243
244     @Test
245     public void testExists() throws Exception {
246         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
247
248         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
249
250         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
251                 eq(actorSelection(actorRef)), eqSerializedDataExists());
252
253         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
254
255         assertEquals("Exists response", false, exists);
256
257         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
258                 eq(actorSelection(actorRef)), eqSerializedDataExists());
259
260         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
261
262         assertEquals("Exists response", true, exists);
263     }
264
265     @Test(expected = PrimaryNotFoundException.class)
266     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
267         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
268             @Override
269             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
270                 return proxy.exists(TestModel.TEST_PATH);
271             }
272         });
273     }
274
275     @Test(expected = ReadFailedException.class)
276     public void testExistsWithInvalidReplyMessageType() throws Exception {
277         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
278
279         doReturn(Futures.successful(new Object())).when(mockActorContext).
280                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
281
282         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
283                 READ_ONLY);
284
285         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
286     }
287
288     @Test(expected = TestException.class)
289     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
290         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
291
292         doReturn(Futures.failed(new TestException())).when(mockActorContext).
293                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
294
295         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
296
297         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
298     }
299
300     @Test(expected = TestException.class)
301     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
302         doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
303                 when(mockActorContext).getDatastoreContext();
304
305         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
306
307         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
308
309         expectFailedBatchedModifications(actorRef);
310
311         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
312                 eq(actorSelection(actorRef)), eqSerializedDataExists());
313
314         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
315                 READ_WRITE);
316
317         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
318
319         transactionProxy.delete(TestModel.TEST_PATH);
320
321         try {
322             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
323         } finally {
324             verify(mockActorContext, times(0)).executeOperationAsync(
325                     eq(actorSelection(actorRef)), eqSerializedDataExists());
326         }
327     }
328
329     @Test
330     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
331         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
332
333         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
334
335         expectBatchedModifications(actorRef, 1);
336
337         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
338                 eq(actorSelection(actorRef)), eqSerializedDataExists());
339
340         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
341
342         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
343
344         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
345
346         assertEquals("Exists response", true, exists);
347
348         InOrder inOrder = Mockito.inOrder(mockActorContext);
349         inOrder.verify(mockActorContext).executeOperationAsync(
350                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
351
352         inOrder.verify(mockActorContext).executeOperationAsync(
353                 eq(actorSelection(actorRef)), eqSerializedDataExists());
354     }
355
356     @Test(expected=IllegalStateException.class)
357     public void testExistsPreConditionCheck() {
358         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
359         transactionProxy.exists(TestModel.TEST_PATH);
360     }
361
362     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
363             Class<?>... expResultTypes) throws Exception {
364         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
365
366         int i = 0;
367         for( Future<Object> future: futures) {
368             assertNotNull("Recording operation Future is null", future);
369
370             Class<?> expResultType = expResultTypes[i++];
371             if(Throwable.class.isAssignableFrom(expResultType)) {
372                 try {
373                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
374                     fail("Expected exception from recording operation Future");
375                 } catch(Exception e) {
376                     // Expected
377                 }
378             } else {
379                 assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
380                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
381             }
382         }
383     }
384
385     @Test
386     public void testWrite() throws Exception {
387         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
388
389         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
390
391         expectBatchedModifications(actorRef, 1);
392         expectReadyTransaction(actorRef);
393
394         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
395
396         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
397
398         // This sends the batched modification.
399         transactionProxy.ready();
400
401         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
402
403         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
404                 BatchedModificationsReply.class);
405     }
406
407     @Test
408     public void testWriteAfterAsyncRead() throws Throwable {
409         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
410
411         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
412         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
413                 eq(getSystem().actorSelection(actorRef.path())),
414                 eqCreateTransaction(memberName, READ_WRITE));
415
416         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
417                 eq(actorSelection(actorRef)), eqSerializedReadData());
418
419         expectBatchedModifications(actorRef, 1);
420         expectReadyTransaction(actorRef);
421
422         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
423
424         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
425
426         final CountDownLatch readComplete = new CountDownLatch(1);
427         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
428         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
429                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
430                     @Override
431                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
432                         try {
433                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
434                         } catch (Exception e) {
435                             caughtEx.set(e);
436                         } finally {
437                             readComplete.countDown();
438                         }
439                     }
440
441                     @Override
442                     public void onFailure(Throwable t) {
443                         caughtEx.set(t);
444                         readComplete.countDown();
445                     }
446                 });
447
448         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
449
450         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
451
452         if(caughtEx.get() != null) {
453             throw caughtEx.get();
454         }
455
456         // This sends the batched modification.
457         transactionProxy.ready();
458
459         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
460
461         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
462                 BatchedModificationsReply.class);
463     }
464
465     @Test(expected=IllegalStateException.class)
466     public void testWritePreConditionCheck() {
467         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
468         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
469     }
470
471     @Test(expected=IllegalStateException.class)
472     public void testWriteAfterReadyPreConditionCheck() {
473         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
474
475         transactionProxy.ready();
476
477         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
478     }
479
480     @Test
481     public void testMerge() throws Exception {
482         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
483
484         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
485
486         expectBatchedModifications(actorRef, 1);
487         expectReadyTransaction(actorRef);
488
489         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
490
491         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
492
493         // This sends the batched modification.
494         transactionProxy.ready();
495
496         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
497
498         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
499                 BatchedModificationsReply.class);
500     }
501
502     @Test
503     public void testDelete() throws Exception {
504         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
505
506         expectBatchedModifications(actorRef, 1);
507         expectReadyTransaction(actorRef);
508
509         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
510
511         transactionProxy.delete(TestModel.TEST_PATH);
512
513         // This sends the batched modification.
514         transactionProxy.ready();
515
516         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
517
518         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
519                 BatchedModificationsReply.class);
520     }
521
522     @Test
523     public void testReady() throws Exception {
524         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
525
526         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
527
528         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
529                 eq(actorSelection(actorRef)), eqSerializedReadData());
530
531         expectBatchedModifications(actorRef, 1);
532         expectReadyTransaction(actorRef);
533
534         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
535
536         transactionProxy.read(TestModel.TEST_PATH);
537
538         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
539
540         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
541
542         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
543
544         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
545
546         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
547                 BatchedModificationsReply.class);
548
549         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
550
551         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
552                 isA(BatchedModifications.class));
553     }
554
555     @Test
556     public void testReadyWithRecordingOperationFailure() throws Exception {
557         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
558
559         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
560
561         expectFailedBatchedModifications(actorRef);
562
563         expectReadyTransaction(actorRef);
564
565         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
566
567         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
568
569         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
570
571         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
572
573         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
574
575         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
576
577         verifyCohortFutures(proxy, TestException.class);
578
579         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
580     }
581
582     @Test
583     public void testReadyWithReplyFailure() throws Exception {
584         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
585
586         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
587
588         expectBatchedModifications(actorRef, 1);
589
590         doReturn(Futures.failed(new TestException())).when(mockActorContext).
591                 executeOperationAsync(eq(actorSelection(actorRef)),
592                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
593
594         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
595
596         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
597
598         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
599
600         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
601
602         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
603
604         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
605                 BatchedModificationsReply.class);
606
607         verifyCohortFutures(proxy, TestException.class);
608     }
609
610     @Test
611     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
612
613         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
614                 mockActorContext).findPrimaryShardAsync(anyString());
615
616         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
617
618         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
619
620         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
621
622         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
623
624         transactionProxy.delete(TestModel.TEST_PATH);
625
626         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
627
628         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
629
630         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
631
632         verifyCohortFutures(proxy, PrimaryNotFoundException.class);
633     }
634
635     @Test
636     public void testReadyWithInvalidReplyMessageType() throws Exception {
637         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
638
639         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
640
641         expectBatchedModifications(actorRef, 1);
642
643         doReturn(Futures.successful(new Object())).when(mockActorContext).
644                 executeOperationAsync(eq(actorSelection(actorRef)),
645                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
646
647         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
648
649         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
650
651         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
652
653         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
654
655         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
656
657         verifyCohortFutures(proxy, IllegalArgumentException.class);
658     }
659
660     @Test
661     public void testUnusedTransaction() throws Exception {
662         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
663
664         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
665
666         assertEquals("canCommit", true, ready.canCommit().get());
667         ready.preCommit().get();
668         ready.commit().get();
669     }
670
671     @Test
672     public void testGetIdentifier() {
673         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
674         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
675                 TransactionProxy.TransactionType.READ_ONLY);
676
677         Object id = transactionProxy.getIdentifier();
678         assertNotNull("getIdentifier returned null", id);
679         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
680     }
681
682     @Test
683     public void testClose() throws Exception{
684         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
685
686         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
687                 eq(actorSelection(actorRef)), eqSerializedReadData());
688
689         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
690
691         transactionProxy.read(TestModel.TEST_PATH);
692
693         transactionProxy.close();
694
695         verify(mockActorContext).sendOperationAsync(
696                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
697     }
698
699
700     /**
701      * Method to test a local Tx actor. The Tx paths are matched to decide if the
702      * Tx actor is local or not. This is done by mocking the Tx actor path
703      * and the caller paths and ensuring that the paths have the remote-address format
704      *
705      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
706      * the paths returned for the actors for all the tests are not qualified remote paths.
707      * Hence are treated as non-local/remote actors. In short, all tests except
708      * few below run for remote actors
709      *
710      * @throws Exception
711      */
712     @Test
713     public void testLocalTxActorRead() throws Exception {
714         ActorSystem actorSystem = getSystem();
715         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
716
717         doReturn(actorSystem.actorSelection(shardActorRef.path())).
718             when(mockActorContext).actorSelection(shardActorRef.path().toString());
719
720         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
721             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
722
723         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
724         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
725             .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
726
727         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
728             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
729                 eqCreateTransaction(memberName, READ_ONLY));
730
731         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
732
733         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
734
735         // negative test case with null as the reply
736         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
737             any(ActorSelection.class), eqReadData());
738
739         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
740             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
741
742         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
743
744         // test case with node as read data reply
745         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
746
747         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
748             any(ActorSelection.class), eqReadData());
749
750         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
751
752         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
753
754         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
755
756         // test for local data exists
757         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
758             any(ActorSelection.class), eqDataExists());
759
760         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
761
762         assertEquals("Exists response", true, exists);
763     }
764
765     @Test
766     public void testLocalTxActorReady() throws Exception {
767         ActorSystem actorSystem = getSystem();
768         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
769
770         doReturn(actorSystem.actorSelection(shardActorRef.path())).
771             when(mockActorContext).actorSelection(shardActorRef.path().toString());
772
773         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
774             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
775
776         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
777         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
778             setTransactionId("txn-1").setTransactionActorPath(actorPath).
779             setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
780
781         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
782             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
783                 eqCreateTransaction(memberName, WRITE_ONLY));
784
785         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
786
787         doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
788                 any(ActorSelection.class), isA(BatchedModifications.class));
789
790         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
791
792         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
793         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
794
795         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
796                 BatchedModificationsReply.class);
797
798         // testing ready
799         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
800             any(ActorSelection.class), isA(ReadyTransaction.class));
801
802         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
803
804         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
805
806         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
807
808         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
809     }
810
811     private static interface TransactionProxyOperation {
812         void run(TransactionProxy transactionProxy);
813     }
814
815     private void throttleOperation(TransactionProxyOperation operation) {
816         throttleOperation(operation, 1, true);
817     }
818
819     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
820         ActorSystem actorSystem = getSystem();
821         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
822
823         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
824
825         doReturn(actorSystem.actorSelection(shardActorRef.path())).
826                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
827
828         if(shardFound) {
829             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
830                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
831         } else {
832             doReturn(Futures.failed(new Exception("not found")))
833                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
834         }
835
836         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
837         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
838                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
839                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
840
841         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
842                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
843                         eqCreateTransaction(memberName, READ_WRITE));
844
845         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
846
847         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
848
849         long start = System.nanoTime();
850
851         operation.run(transactionProxy);
852
853         long end = System.nanoTime();
854
855         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
856         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
857                 expected, (end-start)), (end - start) > expected);
858
859     }
860
861     private void completeOperation(TransactionProxyOperation operation){
862         completeOperation(operation, true);
863     }
864
865     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
866         ActorSystem actorSystem = getSystem();
867         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
868
869         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
870
871         doReturn(actorSystem.actorSelection(shardActorRef.path())).
872                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
873
874         if(shardFound) {
875             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
876                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
877         } else {
878             doReturn(Futures.failed(new Exception("not found")))
879                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
880         }
881
882         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
883         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
884                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
885                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
886
887         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
888                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
889                         eqCreateTransaction(memberName, READ_WRITE));
890
891         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
892
893         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
894
895         long start = System.nanoTime();
896
897         operation.run(transactionProxy);
898
899         long end = System.nanoTime();
900
901         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
902         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
903                 expected, (end-start)), (end - start) <= expected);
904     }
905
906     public void testWriteThrottling(boolean shardFound){
907
908         throttleOperation(new TransactionProxyOperation() {
909             @Override
910             public void run(TransactionProxy transactionProxy) {
911                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
912
913                 expectBatchedModifications(2);
914
915                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
916
917                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
918             }
919         }, 1, shardFound);
920     }
921
922     @Test
923     public void testWriteThrottlingWhenShardFound(){
924         throttleOperation(new TransactionProxyOperation() {
925             @Override
926             public void run(TransactionProxy transactionProxy) {
927                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
928
929                 expectIncompleteBatchedModifications();
930
931                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
932
933                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
934             }
935         });
936     }
937
938     @Test
939     public void testWriteThrottlingWhenShardNotFound(){
940         // Confirm that there is no throttling when the Shard is not found
941         completeOperation(new TransactionProxyOperation() {
942             @Override
943             public void run(TransactionProxy transactionProxy) {
944                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
945
946                 expectBatchedModifications(2);
947
948                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
949
950                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
951             }
952         }, false);
953
954     }
955
956
957     @Test
958     public void testWriteCompletion(){
959         completeOperation(new TransactionProxyOperation() {
960             @Override
961             public void run(TransactionProxy transactionProxy) {
962                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
963
964                 expectBatchedModifications(2);
965
966                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
967
968                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
969             }
970         });
971     }
972
973     @Test
974     public void testMergeThrottlingWhenShardFound(){
975
976         throttleOperation(new TransactionProxyOperation() {
977             @Override
978             public void run(TransactionProxy transactionProxy) {
979                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
980
981                 expectIncompleteBatchedModifications();
982
983                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
984
985                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
986             }
987         });
988     }
989
990     @Test
991     public void testMergeThrottlingWhenShardNotFound(){
992
993         completeOperation(new TransactionProxyOperation() {
994             @Override
995             public void run(TransactionProxy transactionProxy) {
996                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
997
998                 expectBatchedModifications(2);
999
1000                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1001
1002                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1003             }
1004         }, false);
1005     }
1006
1007     @Test
1008     public void testMergeCompletion(){
1009         completeOperation(new TransactionProxyOperation() {
1010             @Override
1011             public void run(TransactionProxy transactionProxy) {
1012                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1013
1014                 expectBatchedModifications(2);
1015
1016                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1017
1018                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1019             }
1020         });
1021
1022     }
1023
1024     @Test
1025     public void testDeleteThrottlingWhenShardFound(){
1026
1027         throttleOperation(new TransactionProxyOperation() {
1028             @Override
1029             public void run(TransactionProxy transactionProxy) {
1030                 expectIncompleteBatchedModifications();
1031
1032                 transactionProxy.delete(TestModel.TEST_PATH);
1033
1034                 transactionProxy.delete(TestModel.TEST_PATH);
1035             }
1036         });
1037     }
1038
1039
1040     @Test
1041     public void testDeleteThrottlingWhenShardNotFound(){
1042
1043         completeOperation(new TransactionProxyOperation() {
1044             @Override
1045             public void run(TransactionProxy transactionProxy) {
1046                 expectBatchedModifications(2);
1047
1048                 transactionProxy.delete(TestModel.TEST_PATH);
1049
1050                 transactionProxy.delete(TestModel.TEST_PATH);
1051             }
1052         }, false);
1053     }
1054
1055     @Test
1056     public void testDeleteCompletion(){
1057         completeOperation(new TransactionProxyOperation() {
1058             @Override
1059             public void run(TransactionProxy transactionProxy) {
1060                 expectBatchedModifications(2);
1061
1062                 transactionProxy.delete(TestModel.TEST_PATH);
1063
1064                 transactionProxy.delete(TestModel.TEST_PATH);
1065             }
1066         });
1067
1068     }
1069
1070     @Test
1071     public void testReadThrottlingWhenShardFound(){
1072
1073         throttleOperation(new TransactionProxyOperation() {
1074             @Override
1075             public void run(TransactionProxy transactionProxy) {
1076                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1077                         any(ActorSelection.class), eqReadData());
1078
1079                 transactionProxy.read(TestModel.TEST_PATH);
1080
1081                 transactionProxy.read(TestModel.TEST_PATH);
1082             }
1083         });
1084     }
1085
1086     @Test
1087     public void testReadThrottlingWhenShardNotFound(){
1088
1089         completeOperation(new TransactionProxyOperation() {
1090             @Override
1091             public void run(TransactionProxy transactionProxy) {
1092                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1093                         any(ActorSelection.class), eqReadData());
1094
1095                 transactionProxy.read(TestModel.TEST_PATH);
1096
1097                 transactionProxy.read(TestModel.TEST_PATH);
1098             }
1099         }, false);
1100     }
1101
1102
1103     @Test
1104     public void testReadCompletion(){
1105         completeOperation(new TransactionProxyOperation() {
1106             @Override
1107             public void run(TransactionProxy transactionProxy) {
1108                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1109
1110                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1111                         any(ActorSelection.class), eqReadData());
1112
1113                 transactionProxy.read(TestModel.TEST_PATH);
1114
1115                 transactionProxy.read(TestModel.TEST_PATH);
1116             }
1117         });
1118
1119     }
1120
1121     @Test
1122     public void testExistsThrottlingWhenShardFound(){
1123
1124         throttleOperation(new TransactionProxyOperation() {
1125             @Override
1126             public void run(TransactionProxy transactionProxy) {
1127                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1128                         any(ActorSelection.class), eqDataExists());
1129
1130                 transactionProxy.exists(TestModel.TEST_PATH);
1131
1132                 transactionProxy.exists(TestModel.TEST_PATH);
1133             }
1134         });
1135     }
1136
1137     @Test
1138     public void testExistsThrottlingWhenShardNotFound(){
1139
1140         completeOperation(new TransactionProxyOperation() {
1141             @Override
1142             public void run(TransactionProxy transactionProxy) {
1143                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1144                         any(ActorSelection.class), eqDataExists());
1145
1146                 transactionProxy.exists(TestModel.TEST_PATH);
1147
1148                 transactionProxy.exists(TestModel.TEST_PATH);
1149             }
1150         }, false);
1151     }
1152
1153
1154     @Test
1155     public void testExistsCompletion(){
1156         completeOperation(new TransactionProxyOperation() {
1157             @Override
1158             public void run(TransactionProxy transactionProxy) {
1159                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1160                         any(ActorSelection.class), eqDataExists());
1161
1162                 transactionProxy.exists(TestModel.TEST_PATH);
1163
1164                 transactionProxy.exists(TestModel.TEST_PATH);
1165             }
1166         });
1167
1168     }
1169
1170     @Test
1171     public void testReadyThrottling(){
1172
1173         throttleOperation(new TransactionProxyOperation() {
1174             @Override
1175             public void run(TransactionProxy transactionProxy) {
1176                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1177
1178                 expectBatchedModifications(1);
1179
1180                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1181                         any(ActorSelection.class), any(ReadyTransaction.class));
1182
1183                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1184
1185                 transactionProxy.ready();
1186             }
1187         });
1188     }
1189
1190     @Test
1191     public void testReadyThrottlingWithTwoTransactionContexts(){
1192
1193         throttleOperation(new TransactionProxyOperation() {
1194             @Override
1195             public void run(TransactionProxy transactionProxy) {
1196                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1197                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1198
1199                 expectBatchedModifications(2);
1200
1201                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1202                         any(ActorSelection.class), any(ReadyTransaction.class));
1203
1204                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1205
1206                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1207
1208                 transactionProxy.ready();
1209             }
1210         }, 2, true);
1211     }
1212
1213     @Test
1214     public void testModificationOperationBatching() throws Throwable {
1215         int shardBatchedModificationCount = 3;
1216         doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
1217                 when(mockActorContext).getDatastoreContext();
1218
1219         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1220
1221         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1222
1223         expectReadyTransaction(actorRef);
1224
1225         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1226         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1227
1228         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1229         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1230
1231         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1232         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1233
1234         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1235         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1236
1237         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1238         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1239
1240         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1241         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1242
1243         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1244         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1245
1246         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1247
1248         transactionProxy.write(writePath1, writeNode1);
1249         transactionProxy.write(writePath2, writeNode2);
1250         transactionProxy.delete(deletePath1);
1251         transactionProxy.merge(mergePath1, mergeNode1);
1252         transactionProxy.merge(mergePath2, mergeNode2);
1253         transactionProxy.write(writePath3, writeNode3);
1254         transactionProxy.merge(mergePath3, mergeNode3);
1255         transactionProxy.delete(deletePath2);
1256
1257         // This sends the last batch.
1258         transactionProxy.ready();
1259
1260         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1261         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1262
1263         verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
1264                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1265
1266         verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
1267                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1268
1269         verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
1270                 new DeleteModification(deletePath2));
1271
1272         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1273                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1274     }
1275
1276     @Test
1277     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1278         int shardBatchedModificationCount = 10;
1279         doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
1280                 when(mockActorContext).getDatastoreContext();
1281
1282         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1283
1284         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1285
1286         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1287         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1288
1289         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1290         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1291
1292         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1293         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1294
1295         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1296         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1297
1298         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1299
1300         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1301                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1302
1303         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1304                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1305
1306         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1307                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1308
1309         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1310
1311         transactionProxy.write(writePath1, writeNode1);
1312         transactionProxy.write(writePath2, writeNode2);
1313
1314         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1315                 get(5, TimeUnit.SECONDS);
1316
1317         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1318         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1319
1320         transactionProxy.merge(mergePath1, mergeNode1);
1321         transactionProxy.merge(mergePath2, mergeNode2);
1322
1323         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1324
1325         transactionProxy.delete(deletePath);
1326
1327         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1328         assertEquals("Exists response", true, exists);
1329
1330         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1331         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1332
1333         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1334         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1335
1336         verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
1337                 new WriteModification(writePath2, writeNode2));
1338
1339         verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
1340                 new MergeModification(mergePath2, mergeNode2));
1341
1342         verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
1343
1344         InOrder inOrder = Mockito.inOrder(mockActorContext);
1345         inOrder.verify(mockActorContext).executeOperationAsync(
1346                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1347
1348         inOrder.verify(mockActorContext).executeOperationAsync(
1349                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1350
1351         inOrder.verify(mockActorContext).executeOperationAsync(
1352                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1353
1354         inOrder.verify(mockActorContext).executeOperationAsync(
1355                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1356
1357         inOrder.verify(mockActorContext).executeOperationAsync(
1358                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1359
1360         inOrder.verify(mockActorContext).executeOperationAsync(
1361                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1362
1363         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1364                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1365     }
1366 }