Optimize TransactionProxy for write-only transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.isA;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.times;
14 import static org.mockito.Mockito.verify;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Props;
22 import akka.dispatch.Futures;
23 import com.google.common.base.Optional;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.mockito.InOrder;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
36 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
38 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
39 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
40 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
46 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
47 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
54 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
55 import scala.concurrent.Await;
56 import scala.concurrent.Future;
57 import scala.concurrent.Promise;
58 import scala.concurrent.duration.Duration;
59
60 @SuppressWarnings("resource")
61 public class TransactionProxyTest extends AbstractTransactionProxyTest {
62
63     @SuppressWarnings("serial")
64     static class TestException extends RuntimeException {
65     }
66
67     static interface Invoker {
68         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
69     }
70
71     @Test
72     public void testRead() throws Exception {
73         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
74
75         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
76
77         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
78                 eq(actorSelection(actorRef)), eqSerializedReadData());
79
80         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
81                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
82
83         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
84
85         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
86
87         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
88                 eq(actorSelection(actorRef)), eqSerializedReadData());
89
90         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
91
92         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
93
94         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
95     }
96
97     @Test(expected = ReadFailedException.class)
98     public void testReadWithInvalidReplyMessageType() throws Exception {
99         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
100
101         doReturn(Futures.successful(new Object())).when(mockActorContext).
102                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
103
104         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
105
106         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
107     }
108
109     @Test(expected = TestException.class)
110     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
111         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
112
113         doReturn(Futures.failed(new TestException())).when(mockActorContext).
114                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
115
116         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
117
118         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
119     }
120
121     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
122             throws Throwable {
123         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
124
125         if (exToThrow instanceof PrimaryNotFoundException) {
126             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
127         } else {
128             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
129                     when(mockActorContext).findPrimaryShardAsync(anyString());
130         }
131
132         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
133                 any(ActorSelection.class), any());
134
135         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
136
137         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
138     }
139
140     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
141         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
142             @Override
143             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
144                 return proxy.read(TestModel.TEST_PATH);
145             }
146         });
147     }
148
149     @Test(expected = PrimaryNotFoundException.class)
150     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
151         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
152     }
153
154     @Test(expected = TimeoutException.class)
155     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
156         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
157                 new Exception("reason")));
158     }
159
160     @Test(expected = TestException.class)
161     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
162         testReadWithExceptionOnInitialCreateTransaction(new TestException());
163     }
164
165     @Test(expected = TestException.class)
166     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
167         doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
168                 when(mockActorContext).getDatastoreContext();
169
170         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
171
172         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
173
174         expectFailedBatchedModifications(actorRef);
175
176         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
177                 eq(actorSelection(actorRef)), eqSerializedReadData());
178
179         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
180
181         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
182
183         transactionProxy.delete(TestModel.TEST_PATH);
184
185         try {
186             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
187         } finally {
188             verify(mockActorContext, times(0)).executeOperationAsync(
189                     eq(actorSelection(actorRef)), eqSerializedReadData());
190         }
191     }
192
193     @Test
194     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
195         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
196
197         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
198
199         expectBatchedModifications(actorRef, 1);
200
201         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
202                 eq(actorSelection(actorRef)), eqSerializedReadData());
203
204         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
205
206         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
207
208         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
209                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
210
211         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
212         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
213
214         InOrder inOrder = Mockito.inOrder(mockActorContext);
215         inOrder.verify(mockActorContext).executeOperationAsync(
216                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
217
218         inOrder.verify(mockActorContext).executeOperationAsync(
219                 eq(actorSelection(actorRef)), eqSerializedReadData());
220     }
221
222     @Test(expected=IllegalStateException.class)
223     public void testReadPreConditionCheck() {
224         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
225         transactionProxy.read(TestModel.TEST_PATH);
226     }
227
228     @Test(expected=IllegalArgumentException.class)
229     public void testInvalidCreateTransactionReply() throws Throwable {
230         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
231
232         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
233             actorSelection(actorRef.path().toString());
234
235         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
236             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
237
238         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
239             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
240
241         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
242
243         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
244     }
245
246     @Test
247     public void testExists() throws Exception {
248         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
249
250         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
251
252         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
253                 eq(actorSelection(actorRef)), eqSerializedDataExists());
254
255         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
256
257         assertEquals("Exists response", false, exists);
258
259         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
260                 eq(actorSelection(actorRef)), eqSerializedDataExists());
261
262         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
263
264         assertEquals("Exists response", true, exists);
265     }
266
267     @Test(expected = PrimaryNotFoundException.class)
268     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
269         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
270             @Override
271             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
272                 return proxy.exists(TestModel.TEST_PATH);
273             }
274         });
275     }
276
277     @Test(expected = ReadFailedException.class)
278     public void testExistsWithInvalidReplyMessageType() throws Exception {
279         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
280
281         doReturn(Futures.successful(new Object())).when(mockActorContext).
282                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
283
284         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
285                 READ_ONLY);
286
287         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
288     }
289
290     @Test(expected = TestException.class)
291     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
292         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
293
294         doReturn(Futures.failed(new TestException())).when(mockActorContext).
295                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
296
297         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
298
299         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
300     }
301
302     @Test(expected = TestException.class)
303     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
304         doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
305                 when(mockActorContext).getDatastoreContext();
306
307         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
308
309         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
310
311         expectFailedBatchedModifications(actorRef);
312
313         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
314                 eq(actorSelection(actorRef)), eqSerializedDataExists());
315
316         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
317                 READ_WRITE);
318
319         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
320
321         transactionProxy.delete(TestModel.TEST_PATH);
322
323         try {
324             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
325         } finally {
326             verify(mockActorContext, times(0)).executeOperationAsync(
327                     eq(actorSelection(actorRef)), eqSerializedDataExists());
328         }
329     }
330
331     @Test
332     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
333         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
334
335         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
336
337         expectBatchedModifications(actorRef, 1);
338
339         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
340                 eq(actorSelection(actorRef)), eqSerializedDataExists());
341
342         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
343
344         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
345
346         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
347
348         assertEquals("Exists response", true, exists);
349
350         InOrder inOrder = Mockito.inOrder(mockActorContext);
351         inOrder.verify(mockActorContext).executeOperationAsync(
352                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
353
354         inOrder.verify(mockActorContext).executeOperationAsync(
355                 eq(actorSelection(actorRef)), eqSerializedDataExists());
356     }
357
358     @Test(expected=IllegalStateException.class)
359     public void testExistsPreConditionCheck() {
360         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
361         transactionProxy.exists(TestModel.TEST_PATH);
362     }
363
364     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
365             Class<?>... expResultTypes) throws Exception {
366         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
367
368         int i = 0;
369         for( Future<Object> future: futures) {
370             assertNotNull("Recording operation Future is null", future);
371
372             Class<?> expResultType = expResultTypes[i++];
373             if(Throwable.class.isAssignableFrom(expResultType)) {
374                 try {
375                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
376                     fail("Expected exception from recording operation Future");
377                 } catch(Exception e) {
378                     // Expected
379                 }
380             } else {
381                 assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
382                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
383             }
384         }
385     }
386
387     @Test
388     public void testWrite() throws Exception {
389         dataStoreContextBuilder.shardBatchedModificationCount(1);
390         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
391
392         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
393
394         expectBatchedModifications(actorRef, 1);
395
396         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
397
398         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
399
400         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
401     }
402
403     @Test
404     public void testWriteAfterAsyncRead() throws Throwable {
405         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
406
407         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
408         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
409                 eq(getSystem().actorSelection(actorRef.path())),
410                 eqCreateTransaction(memberName, READ_WRITE));
411
412         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
413                 eq(actorSelection(actorRef)), eqSerializedReadData());
414
415         expectBatchedModifications(actorRef, 1);
416         expectReadyTransaction(actorRef);
417
418         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
419
420         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
421
422         final CountDownLatch readComplete = new CountDownLatch(1);
423         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
424         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
425                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
426                     @Override
427                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
428                         try {
429                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
430                         } catch (Exception e) {
431                             caughtEx.set(e);
432                         } finally {
433                             readComplete.countDown();
434                         }
435                     }
436
437                     @Override
438                     public void onFailure(Throwable t) {
439                         caughtEx.set(t);
440                         readComplete.countDown();
441                     }
442                 });
443
444         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
445
446         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
447
448         if(caughtEx.get() != null) {
449             throw caughtEx.get();
450         }
451
452         // This sends the batched modification.
453         transactionProxy.ready();
454
455         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
456
457         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
458                 BatchedModificationsReply.class);
459     }
460
461     @Test(expected=IllegalStateException.class)
462     public void testWritePreConditionCheck() {
463         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
464         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
465     }
466
467     @Test(expected=IllegalStateException.class)
468     public void testWriteAfterReadyPreConditionCheck() {
469         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
470
471         transactionProxy.ready();
472
473         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
474     }
475
476     @Test
477     public void testMerge() throws Exception {
478         dataStoreContextBuilder.shardBatchedModificationCount(1);
479         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
480
481         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
482
483         expectBatchedModifications(actorRef, 1);
484
485         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
486
487         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
488
489         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
490     }
491
492     @Test
493     public void testDelete() throws Exception {
494         dataStoreContextBuilder.shardBatchedModificationCount(1);
495         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
496
497         expectBatchedModifications(actorRef, 1);
498
499         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
500
501         transactionProxy.delete(TestModel.TEST_PATH);
502
503         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
504     }
505
506     @Test
507     public void testReadyWithReadWrite() throws Exception {
508         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
509
510         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
511
512         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
513                 eq(actorSelection(actorRef)), eqSerializedReadData());
514
515         expectBatchedModifications(actorRef, 1);
516         expectReadyTransaction(actorRef);
517
518         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
519
520         transactionProxy.read(TestModel.TEST_PATH);
521
522         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
523
524         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
525
526         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
527
528         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
529
530         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
531                 BatchedModificationsReply.class);
532
533         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
534
535         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
536                 isA(BatchedModifications.class));
537
538         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
539                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
540     }
541
542     @Test
543     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
544         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
545
546         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
547
548         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
549
550         expectBatchedModificationsReady(actorRef, 1);
551
552         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
553
554         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
555
556         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
557
558         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
559
560         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
561
562         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
563
564         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
565
566         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
567         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
568
569         verifyBatchedModifications(batchedModifications.get(0), true,
570                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
571
572         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
573                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
574     }
575
576     @Test
577     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
578         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
579         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
580
581         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
582
583         expectBatchedModificationsReady(actorRef, 1);
584
585         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
586
587         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
588
589         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
590
591         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
592
593         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
594
595         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
596                 BatchedModificationsReply.class);
597
598         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
599
600         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
601         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
602
603         verifyBatchedModifications(batchedModifications.get(0), false,
604                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
605
606         verifyBatchedModifications(batchedModifications.get(1), true);
607
608         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
609                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
610     }
611
612     @Test
613     public void testReadyWithRecordingOperationFailure() throws Exception {
614         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
615
616         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
617
618         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
619
620         expectFailedBatchedModifications(actorRef);
621
622         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
623
624         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
625
626         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
627
628         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
629
630         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
631
632         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
633
634         verifyCohortFutures(proxy, TestException.class);
635
636         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
637     }
638
639     @Test
640     public void testReadyWithReplyFailure() throws Exception {
641         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
642
643         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
644
645         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
646
647         expectFailedBatchedModifications(actorRef);
648
649         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
650
651         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
652
653         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
654
655         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
656
657         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
658
659         verifyCohortFutures(proxy, TestException.class);
660     }
661
662     @Test
663     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
664
665         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
666                 mockActorContext).findPrimaryShardAsync(anyString());
667
668         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
669
670         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
671
672         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
673
674         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
675
676         transactionProxy.delete(TestModel.TEST_PATH);
677
678         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
679
680         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
681
682         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
683
684         verifyCohortFutures(proxy, PrimaryNotFoundException.class);
685     }
686
687     @Test
688     public void testReadyWithInvalidReplyMessageType() throws Exception {
689         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
690         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
691
692         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
693
694         //expectBatchedModifications(actorRef, 1);
695
696         doReturn(Futures.successful(new Object())).when(mockActorContext).
697                 executeOperationAsync(eq(actorSelection(actorRef)),
698                         isA(BatchedModifications.class));
699
700         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
701
702         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
703
704         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
705
706         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
707
708         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
709
710         verifyCohortFutures(proxy, IllegalArgumentException.class);
711     }
712
713     @Test
714     public void testGetIdentifier() {
715         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
716         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
717                 TransactionProxy.TransactionType.READ_ONLY);
718
719         Object id = transactionProxy.getIdentifier();
720         assertNotNull("getIdentifier returned null", id);
721         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
722     }
723
724     @Test
725     public void testClose() throws Exception{
726         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
727
728         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
729                 eq(actorSelection(actorRef)), eqSerializedReadData());
730
731         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
732
733         transactionProxy.read(TestModel.TEST_PATH);
734
735         transactionProxy.close();
736
737         verify(mockActorContext).sendOperationAsync(
738                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
739     }
740
741
742     /**
743      * Method to test a local Tx actor. The Tx paths are matched to decide if the
744      * Tx actor is local or not. This is done by mocking the Tx actor path
745      * and the caller paths and ensuring that the paths have the remote-address format
746      *
747      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
748      * the paths returned for the actors for all the tests are not qualified remote paths.
749      * Hence are treated as non-local/remote actors. In short, all tests except
750      * few below run for remote actors
751      *
752      * @throws Exception
753      */
754     @Test
755     public void testLocalTxActorRead() throws Exception {
756         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
757         doReturn(true).when(mockActorContext).isPathLocal(anyString());
758
759         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
760
761         // negative test case with null as the reply
762         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
763             any(ActorSelection.class), eqReadData());
764
765         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
766             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
767
768         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
769
770         // test case with node as read data reply
771         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
772
773         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
774             any(ActorSelection.class), eqReadData());
775
776         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
777
778         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
779
780         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
781
782         // test for local data exists
783         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
784             any(ActorSelection.class), eqDataExists());
785
786         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
787
788         assertEquals("Exists response", true, exists);
789     }
790
791     @Test
792     public void testLocalTxActorReady() throws Exception {
793         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
794         doReturn(true).when(mockActorContext).isPathLocal(anyString());
795
796         doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
797                 any(ActorSelection.class), isA(BatchedModifications.class));
798
799         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
800
801         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
802         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
803
804         // testing ready
805         doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
806             eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
807
808         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
809
810         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
811
812         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
813
814         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
815     }
816
817     private static interface TransactionProxyOperation {
818         void run(TransactionProxy transactionProxy);
819     }
820
821     private void throttleOperation(TransactionProxyOperation operation) {
822         throttleOperation(operation, 1, true);
823     }
824
825     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
826         ActorSystem actorSystem = getSystem();
827         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
828
829         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
830
831         doReturn(actorSystem.actorSelection(shardActorRef.path())).
832                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
833
834         if(shardFound) {
835             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
836                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
837         } else {
838             doReturn(Futures.failed(new Exception("not found")))
839                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
840         }
841
842         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
843         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
844                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
845                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
846
847         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
848                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
849                         eqCreateTransaction(memberName, READ_WRITE));
850
851         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
852
853         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
854
855         long start = System.nanoTime();
856
857         operation.run(transactionProxy);
858
859         long end = System.nanoTime();
860
861         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
862         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
863                 expected, (end-start)), (end - start) > expected);
864
865     }
866
867     private void completeOperation(TransactionProxyOperation operation){
868         completeOperation(operation, true);
869     }
870
871     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
872         ActorSystem actorSystem = getSystem();
873         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
874
875         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
876
877         doReturn(actorSystem.actorSelection(shardActorRef.path())).
878                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
879
880         if(shardFound) {
881             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
882                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
883         } else {
884             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
885                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
886         }
887
888         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
889         String actorPath = txActorRef.path().toString();
890         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
891                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
892                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
893
894         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
895
896         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
897                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
898                         eqCreateTransaction(memberName, READ_WRITE));
899
900         doReturn(true).when(mockActorContext).isPathLocal(anyString());
901
902         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
903
904         long start = System.nanoTime();
905
906         operation.run(transactionProxy);
907
908         long end = System.nanoTime();
909
910         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
911         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
912                 expected, (end-start)), (end - start) <= expected);
913     }
914
915     public void testWriteThrottling(boolean shardFound){
916
917         throttleOperation(new TransactionProxyOperation() {
918             @Override
919             public void run(TransactionProxy transactionProxy) {
920                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
921
922                 expectBatchedModifications(2);
923
924                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
925
926                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
927             }
928         }, 1, shardFound);
929     }
930
931     @Test
932     public void testWriteThrottlingWhenShardFound(){
933         dataStoreContextBuilder.shardBatchedModificationCount(1);
934         throttleOperation(new TransactionProxyOperation() {
935             @Override
936             public void run(TransactionProxy transactionProxy) {
937                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
938
939                 expectIncompleteBatchedModifications();
940
941                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
942
943                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
944             }
945         });
946     }
947
948     @Test
949     public void testWriteThrottlingWhenShardNotFound(){
950         // Confirm that there is no throttling when the Shard is not found
951         dataStoreContextBuilder.shardBatchedModificationCount(1);
952         completeOperation(new TransactionProxyOperation() {
953             @Override
954             public void run(TransactionProxy transactionProxy) {
955                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
956
957                 expectBatchedModifications(2);
958
959                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
960
961                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
962             }
963         }, false);
964
965     }
966
967
968     @Test
969     public void testWriteCompletion(){
970         dataStoreContextBuilder.shardBatchedModificationCount(1);
971         completeOperation(new TransactionProxyOperation() {
972             @Override
973             public void run(TransactionProxy transactionProxy) {
974                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
975
976                 expectBatchedModifications(2);
977
978                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
979
980                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
981             }
982         });
983     }
984
985     @Test
986     public void testMergeThrottlingWhenShardFound(){
987         dataStoreContextBuilder.shardBatchedModificationCount(1);
988         throttleOperation(new TransactionProxyOperation() {
989             @Override
990             public void run(TransactionProxy transactionProxy) {
991                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
992
993                 expectIncompleteBatchedModifications();
994
995                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
996
997                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
998             }
999         });
1000     }
1001
1002     @Test
1003     public void testMergeThrottlingWhenShardNotFound(){
1004         dataStoreContextBuilder.shardBatchedModificationCount(1);
1005         completeOperation(new TransactionProxyOperation() {
1006             @Override
1007             public void run(TransactionProxy transactionProxy) {
1008                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1009
1010                 expectBatchedModifications(2);
1011
1012                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1013
1014                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1015             }
1016         }, false);
1017     }
1018
1019     @Test
1020     public void testMergeCompletion(){
1021         dataStoreContextBuilder.shardBatchedModificationCount(1);
1022         completeOperation(new TransactionProxyOperation() {
1023             @Override
1024             public void run(TransactionProxy transactionProxy) {
1025                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1026
1027                 expectBatchedModifications(2);
1028
1029                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1030
1031                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1032             }
1033         });
1034
1035     }
1036
1037     @Test
1038     public void testDeleteThrottlingWhenShardFound(){
1039
1040         throttleOperation(new TransactionProxyOperation() {
1041             @Override
1042             public void run(TransactionProxy transactionProxy) {
1043                 expectIncompleteBatchedModifications();
1044
1045                 transactionProxy.delete(TestModel.TEST_PATH);
1046
1047                 transactionProxy.delete(TestModel.TEST_PATH);
1048             }
1049         });
1050     }
1051
1052
1053     @Test
1054     public void testDeleteThrottlingWhenShardNotFound(){
1055
1056         completeOperation(new TransactionProxyOperation() {
1057             @Override
1058             public void run(TransactionProxy transactionProxy) {
1059                 expectBatchedModifications(2);
1060
1061                 transactionProxy.delete(TestModel.TEST_PATH);
1062
1063                 transactionProxy.delete(TestModel.TEST_PATH);
1064             }
1065         }, false);
1066     }
1067
1068     @Test
1069     public void testDeleteCompletion(){
1070         dataStoreContextBuilder.shardBatchedModificationCount(1);
1071         completeOperation(new TransactionProxyOperation() {
1072             @Override
1073             public void run(TransactionProxy transactionProxy) {
1074                 expectBatchedModifications(2);
1075
1076                 transactionProxy.delete(TestModel.TEST_PATH);
1077
1078                 transactionProxy.delete(TestModel.TEST_PATH);
1079             }
1080         });
1081
1082     }
1083
1084     @Test
1085     public void testReadThrottlingWhenShardFound(){
1086
1087         throttleOperation(new TransactionProxyOperation() {
1088             @Override
1089             public void run(TransactionProxy transactionProxy) {
1090                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1091                         any(ActorSelection.class), eqReadData());
1092
1093                 transactionProxy.read(TestModel.TEST_PATH);
1094
1095                 transactionProxy.read(TestModel.TEST_PATH);
1096             }
1097         });
1098     }
1099
1100     @Test
1101     public void testReadThrottlingWhenShardNotFound(){
1102
1103         completeOperation(new TransactionProxyOperation() {
1104             @Override
1105             public void run(TransactionProxy transactionProxy) {
1106                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1107                         any(ActorSelection.class), eqReadData());
1108
1109                 transactionProxy.read(TestModel.TEST_PATH);
1110
1111                 transactionProxy.read(TestModel.TEST_PATH);
1112             }
1113         }, false);
1114     }
1115
1116
1117     @Test
1118     public void testReadCompletion(){
1119         completeOperation(new TransactionProxyOperation() {
1120             @Override
1121             public void run(TransactionProxy transactionProxy) {
1122                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1123
1124                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1125                         any(ActorSelection.class), eqReadData());
1126
1127                 transactionProxy.read(TestModel.TEST_PATH);
1128
1129                 transactionProxy.read(TestModel.TEST_PATH);
1130             }
1131         });
1132
1133     }
1134
1135     @Test
1136     public void testExistsThrottlingWhenShardFound(){
1137
1138         throttleOperation(new TransactionProxyOperation() {
1139             @Override
1140             public void run(TransactionProxy transactionProxy) {
1141                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1142                         any(ActorSelection.class), eqDataExists());
1143
1144                 transactionProxy.exists(TestModel.TEST_PATH);
1145
1146                 transactionProxy.exists(TestModel.TEST_PATH);
1147             }
1148         });
1149     }
1150
1151     @Test
1152     public void testExistsThrottlingWhenShardNotFound(){
1153
1154         completeOperation(new TransactionProxyOperation() {
1155             @Override
1156             public void run(TransactionProxy transactionProxy) {
1157                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1158                         any(ActorSelection.class), eqDataExists());
1159
1160                 transactionProxy.exists(TestModel.TEST_PATH);
1161
1162                 transactionProxy.exists(TestModel.TEST_PATH);
1163             }
1164         }, false);
1165     }
1166
1167
1168     @Test
1169     public void testExistsCompletion(){
1170         completeOperation(new TransactionProxyOperation() {
1171             @Override
1172             public void run(TransactionProxy transactionProxy) {
1173                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1174                         any(ActorSelection.class), eqDataExists());
1175
1176                 transactionProxy.exists(TestModel.TEST_PATH);
1177
1178                 transactionProxy.exists(TestModel.TEST_PATH);
1179             }
1180         });
1181
1182     }
1183
1184     @Test
1185     public void testReadyThrottling(){
1186
1187         throttleOperation(new TransactionProxyOperation() {
1188             @Override
1189             public void run(TransactionProxy transactionProxy) {
1190                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1191
1192                 expectBatchedModifications(1);
1193
1194                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1195                         any(ActorSelection.class), any(ReadyTransaction.class));
1196
1197                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1198
1199                 transactionProxy.ready();
1200             }
1201         });
1202     }
1203
1204     @Test
1205     public void testReadyThrottlingWithTwoTransactionContexts(){
1206
1207         throttleOperation(new TransactionProxyOperation() {
1208             @Override
1209             public void run(TransactionProxy transactionProxy) {
1210                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1211                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1212
1213                 expectBatchedModifications(2);
1214
1215                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1216                         any(ActorSelection.class), any(ReadyTransaction.class));
1217
1218                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1219
1220                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1221
1222                 transactionProxy.ready();
1223             }
1224         }, 2, true);
1225     }
1226
1227     private void testModificationOperationBatching(TransactionType type) throws Exception {
1228         int shardBatchedModificationCount = 3;
1229         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1230
1231         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1232
1233         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1234
1235         expectReadyTransaction(actorRef);
1236
1237         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1238         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1239
1240         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1241         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1242
1243         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1244         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1245
1246         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1247         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1248
1249         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1250         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1251
1252         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1253         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1254
1255         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1256         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1257
1258         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1259
1260         transactionProxy.write(writePath1, writeNode1);
1261         transactionProxy.write(writePath2, writeNode2);
1262         transactionProxy.delete(deletePath1);
1263         transactionProxy.merge(mergePath1, mergeNode1);
1264         transactionProxy.merge(mergePath2, mergeNode2);
1265         transactionProxy.write(writePath3, writeNode3);
1266         transactionProxy.merge(mergePath3, mergeNode3);
1267         transactionProxy.delete(deletePath2);
1268
1269         // This sends the last batch.
1270         transactionProxy.ready();
1271
1272         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1273         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1274
1275         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1276                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1277
1278         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1279                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1280
1281         boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
1282         verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
1283                 new DeleteModification(deletePath2));
1284
1285         if(optimizedWriteOnly) {
1286             verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1287                     BatchedModificationsReply.class, BatchedModificationsReply.class);
1288         } else {
1289             verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1290                     BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1291         }
1292     }
1293
1294     @Test
1295     public void testReadWriteModificationOperationBatching() throws Throwable {
1296         testModificationOperationBatching(READ_WRITE);
1297     }
1298
1299     @Test
1300     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1301         testModificationOperationBatching(WRITE_ONLY);
1302     }
1303
1304     @Test
1305     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1306         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1307         testModificationOperationBatching(WRITE_ONLY);
1308     }
1309
1310     @Test
1311     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1312
1313         int shardBatchedModificationCount = 10;
1314         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1315
1316         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1317
1318         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1319
1320         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1321         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1322
1323         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1324         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1325
1326         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1327         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1328
1329         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1330         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1331
1332         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1333
1334         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1335                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1336
1337         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1338                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1339
1340         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1341                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1342
1343         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1344
1345         transactionProxy.write(writePath1, writeNode1);
1346         transactionProxy.write(writePath2, writeNode2);
1347
1348         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1349                 get(5, TimeUnit.SECONDS);
1350
1351         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1352         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1353
1354         transactionProxy.merge(mergePath1, mergeNode1);
1355         transactionProxy.merge(mergePath2, mergeNode2);
1356
1357         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1358
1359         transactionProxy.delete(deletePath);
1360
1361         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1362         assertEquals("Exists response", true, exists);
1363
1364         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1365         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1366
1367         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1368         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1369
1370         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1371                 new WriteModification(writePath2, writeNode2));
1372
1373         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1374                 new MergeModification(mergePath2, mergeNode2));
1375
1376         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1377
1378         InOrder inOrder = Mockito.inOrder(mockActorContext);
1379         inOrder.verify(mockActorContext).executeOperationAsync(
1380                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1381
1382         inOrder.verify(mockActorContext).executeOperationAsync(
1383                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1384
1385         inOrder.verify(mockActorContext).executeOperationAsync(
1386                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1387
1388         inOrder.verify(mockActorContext).executeOperationAsync(
1389                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1390
1391         inOrder.verify(mockActorContext).executeOperationAsync(
1392                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1393
1394         inOrder.verify(mockActorContext).executeOperationAsync(
1395                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1396
1397         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1398                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1399     }
1400 }