Merge "Set unable-to-connect(netconf node) if no sources are available"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.isA;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.times;
14 import static org.mockito.Mockito.verify;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Props;
22 import akka.dispatch.Futures;
23 import com.google.common.base.Optional;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.mockito.InOrder;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
40 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
41 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
48 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
49 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
50 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
51 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
52 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import scala.concurrent.Await;
58 import scala.concurrent.Future;
59 import scala.concurrent.Promise;
60 import scala.concurrent.duration.Duration;
61
62 @SuppressWarnings("resource")
63 public class TransactionProxyTest extends AbstractTransactionProxyTest {
64
65     @SuppressWarnings("serial")
66     static class TestException extends RuntimeException {
67     }
68
69     static interface Invoker {
70         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
71     }
72
73     @Test
74     public void testRead() throws Exception {
75         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
76
77         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
78
79         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
80                 eq(actorSelection(actorRef)), eqSerializedReadData());
81
82         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
83                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
84
85         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
86
87         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
88
89         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
90                 eq(actorSelection(actorRef)), eqSerializedReadData());
91
92         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
93
94         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
95
96         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
97     }
98
99     @Test(expected = ReadFailedException.class)
100     public void testReadWithInvalidReplyMessageType() throws Exception {
101         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
102
103         doReturn(Futures.successful(new Object())).when(mockActorContext).
104                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
105
106         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
107
108         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
109     }
110
111     @Test(expected = TestException.class)
112     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
113         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
114
115         doReturn(Futures.failed(new TestException())).when(mockActorContext).
116                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
117
118         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
119
120         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
121     }
122
123     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
124             throws Throwable {
125         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
126
127         if (exToThrow instanceof PrimaryNotFoundException) {
128             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
129         } else {
130             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
131                     when(mockActorContext).findPrimaryShardAsync(anyString());
132         }
133
134         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
135                 any(ActorSelection.class), any());
136
137         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
138
139         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
140     }
141
142     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
143         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
144             @Override
145             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
146                 return proxy.read(TestModel.TEST_PATH);
147             }
148         });
149     }
150
151     @Test(expected = PrimaryNotFoundException.class)
152     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
153         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
154     }
155
156     @Test(expected = TimeoutException.class)
157     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
158         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
159                 new Exception("reason")));
160     }
161
162     @Test(expected = TestException.class)
163     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
164         testReadWithExceptionOnInitialCreateTransaction(new TestException());
165     }
166
167     @Test(expected = TestException.class)
168     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
169         doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
170                 when(mockActorContext).getDatastoreContext();
171
172         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
173
174         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
175
176         expectFailedBatchedModifications(actorRef);
177
178         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
179                 eq(actorSelection(actorRef)), eqSerializedReadData());
180
181         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
182
183         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
184
185         transactionProxy.delete(TestModel.TEST_PATH);
186
187         try {
188             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
189         } finally {
190             verify(mockActorContext, times(0)).executeOperationAsync(
191                     eq(actorSelection(actorRef)), eqSerializedReadData());
192         }
193     }
194
195     @Test
196     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
197         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
198
199         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
200
201         expectBatchedModifications(actorRef, 1);
202
203         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
204                 eq(actorSelection(actorRef)), eqSerializedReadData());
205
206         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
207
208         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
209
210         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
211                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
212
213         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
214         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
215
216         InOrder inOrder = Mockito.inOrder(mockActorContext);
217         inOrder.verify(mockActorContext).executeOperationAsync(
218                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
219
220         inOrder.verify(mockActorContext).executeOperationAsync(
221                 eq(actorSelection(actorRef)), eqSerializedReadData());
222     }
223
224     @Test(expected=IllegalStateException.class)
225     public void testReadPreConditionCheck() {
226         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
227         transactionProxy.read(TestModel.TEST_PATH);
228     }
229
230     @Test(expected=IllegalArgumentException.class)
231     public void testInvalidCreateTransactionReply() throws Throwable {
232         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
233
234         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
235             actorSelection(actorRef.path().toString());
236
237         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
238             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
239
240         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
241             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
242
243         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
244
245         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
246     }
247
248     @Test
249     public void testExists() throws Exception {
250         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
251
252         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
253
254         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
255                 eq(actorSelection(actorRef)), eqSerializedDataExists());
256
257         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
258
259         assertEquals("Exists response", false, exists);
260
261         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
262                 eq(actorSelection(actorRef)), eqSerializedDataExists());
263
264         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
265
266         assertEquals("Exists response", true, exists);
267     }
268
269     @Test(expected = PrimaryNotFoundException.class)
270     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
271         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
272             @Override
273             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
274                 return proxy.exists(TestModel.TEST_PATH);
275             }
276         });
277     }
278
279     @Test(expected = ReadFailedException.class)
280     public void testExistsWithInvalidReplyMessageType() throws Exception {
281         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
282
283         doReturn(Futures.successful(new Object())).when(mockActorContext).
284                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
285
286         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
287                 READ_ONLY);
288
289         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
290     }
291
292     @Test(expected = TestException.class)
293     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
294         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
295
296         doReturn(Futures.failed(new TestException())).when(mockActorContext).
297                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
298
299         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
300
301         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
302     }
303
304     @Test(expected = TestException.class)
305     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
306         doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
307                 when(mockActorContext).getDatastoreContext();
308
309         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
310
311         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
312
313         expectFailedBatchedModifications(actorRef);
314
315         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
316                 eq(actorSelection(actorRef)), eqSerializedDataExists());
317
318         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
319                 READ_WRITE);
320
321         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
322
323         transactionProxy.delete(TestModel.TEST_PATH);
324
325         try {
326             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
327         } finally {
328             verify(mockActorContext, times(0)).executeOperationAsync(
329                     eq(actorSelection(actorRef)), eqSerializedDataExists());
330         }
331     }
332
333     @Test
334     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
335         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
336
337         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
338
339         expectBatchedModifications(actorRef, 1);
340
341         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
342                 eq(actorSelection(actorRef)), eqSerializedDataExists());
343
344         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
345
346         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
347
348         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
349
350         assertEquals("Exists response", true, exists);
351
352         InOrder inOrder = Mockito.inOrder(mockActorContext);
353         inOrder.verify(mockActorContext).executeOperationAsync(
354                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
355
356         inOrder.verify(mockActorContext).executeOperationAsync(
357                 eq(actorSelection(actorRef)), eqSerializedDataExists());
358     }
359
360     @Test(expected=IllegalStateException.class)
361     public void testExistsPreConditionCheck() {
362         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
363         transactionProxy.exists(TestModel.TEST_PATH);
364     }
365
366     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
367             Class<?>... expResultTypes) throws Exception {
368         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
369
370         int i = 0;
371         for( Future<Object> future: futures) {
372             assertNotNull("Recording operation Future is null", future);
373
374             Class<?> expResultType = expResultTypes[i++];
375             if(Throwable.class.isAssignableFrom(expResultType)) {
376                 try {
377                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
378                     fail("Expected exception from recording operation Future");
379                 } catch(Exception e) {
380                     // Expected
381                 }
382             } else {
383                 assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
384                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
385             }
386         }
387     }
388
389     @Test
390     public void testWrite() throws Exception {
391         dataStoreContextBuilder.shardBatchedModificationCount(1);
392         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
393
394         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
395
396         expectBatchedModifications(actorRef, 1);
397
398         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
399
400         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
401
402         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
403     }
404
405     @Test
406     public void testWriteAfterAsyncRead() throws Throwable {
407         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
408
409         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
410         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
411                 eq(getSystem().actorSelection(actorRef.path())),
412                 eqCreateTransaction(memberName, READ_WRITE));
413
414         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
415                 eq(actorSelection(actorRef)), eqSerializedReadData());
416
417         expectBatchedModifications(actorRef, 1);
418         expectReadyTransaction(actorRef);
419
420         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
421
422         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
423
424         final CountDownLatch readComplete = new CountDownLatch(1);
425         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
426         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
427                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
428                     @Override
429                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
430                         try {
431                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
432                         } catch (Exception e) {
433                             caughtEx.set(e);
434                         } finally {
435                             readComplete.countDown();
436                         }
437                     }
438
439                     @Override
440                     public void onFailure(Throwable t) {
441                         caughtEx.set(t);
442                         readComplete.countDown();
443                     }
444                 });
445
446         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
447
448         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
449
450         if(caughtEx.get() != null) {
451             throw caughtEx.get();
452         }
453
454         // This sends the batched modification.
455         transactionProxy.ready();
456
457         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
458
459         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
460                 BatchedModificationsReply.class);
461     }
462
463     @Test(expected=IllegalStateException.class)
464     public void testWritePreConditionCheck() {
465         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
466         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
467     }
468
469     @Test(expected=IllegalStateException.class)
470     public void testWriteAfterReadyPreConditionCheck() {
471         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
472
473         transactionProxy.ready();
474
475         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
476     }
477
478     @Test
479     public void testMerge() throws Exception {
480         dataStoreContextBuilder.shardBatchedModificationCount(1);
481         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
482
483         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
484
485         expectBatchedModifications(actorRef, 1);
486
487         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
488
489         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
490
491         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
492     }
493
494     @Test
495     public void testDelete() throws Exception {
496         dataStoreContextBuilder.shardBatchedModificationCount(1);
497         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
498
499         expectBatchedModifications(actorRef, 1);
500
501         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
502
503         transactionProxy.delete(TestModel.TEST_PATH);
504
505         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
506     }
507
508     @Test
509     public void testReadyWithReadWrite() throws Exception {
510         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
511
512         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
513
514         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
515                 eq(actorSelection(actorRef)), eqSerializedReadData());
516
517         expectBatchedModifications(actorRef, 1);
518         expectReadyTransaction(actorRef);
519
520         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
521
522         transactionProxy.read(TestModel.TEST_PATH);
523
524         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
525
526         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
527
528         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
529
530         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
531
532         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
533                 BatchedModificationsReply.class);
534
535         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
536
537         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
538                 isA(BatchedModifications.class));
539
540         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
541                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
542     }
543
544     @Test
545     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
546         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
547
548         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
549
550         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
551
552         expectBatchedModificationsReady(actorRef, 1);
553
554         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
555
556         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
557
558         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
559
560         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
561
562         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
563
564         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
565
566         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
567
568         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
569         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
570
571         verifyBatchedModifications(batchedModifications.get(0), true,
572                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
573
574         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
575                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
576     }
577
578     @Test
579     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
580         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
581         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
582
583         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
584
585         expectBatchedModificationsReady(actorRef, 1);
586
587         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
588
589         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
590
591         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
592
593         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
594
595         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
596
597         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
598                 BatchedModificationsReply.class);
599
600         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
601
602         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
603         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
604
605         verifyBatchedModifications(batchedModifications.get(0), false,
606                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
607
608         verifyBatchedModifications(batchedModifications.get(1), true);
609
610         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
611                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
612     }
613
614     @Test
615     public void testReadyWithRecordingOperationFailure() throws Exception {
616         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
617
618         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
619
620         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
621
622         expectFailedBatchedModifications(actorRef);
623
624         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
625
626         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
627
628         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
629
630         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
631
632         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
633
634         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
635
636         verifyCohortFutures(proxy, TestException.class);
637
638         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
639     }
640
641     @Test
642     public void testReadyWithReplyFailure() throws Exception {
643         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
644
645         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
646
647         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
648
649         expectFailedBatchedModifications(actorRef);
650
651         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
652
653         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
654
655         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
656
657         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
658
659         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
660
661         verifyCohortFutures(proxy, TestException.class);
662     }
663
664     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
665         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
666
667         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
668
669         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
670
671         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
672
673         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
674
675         transactionProxy.delete(TestModel.TEST_PATH);
676
677         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
678
679         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
680
681         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
682
683         verifyCohortFutures(proxy, toThrow.getClass());
684     }
685
686     @Test
687     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
688         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
689     }
690
691     @Test
692     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
693         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
694     }
695
696     @Test
697     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
698         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
699     }
700
701     @Test
702     public void testReadyWithInvalidReplyMessageType() throws Exception {
703         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
704         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
705
706         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
707
708         //expectBatchedModifications(actorRef, 1);
709
710         doReturn(Futures.successful(new Object())).when(mockActorContext).
711                 executeOperationAsync(eq(actorSelection(actorRef)),
712                         isA(BatchedModifications.class));
713
714         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
715
716         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
717
718         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
719
720         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
721
722         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
723
724         verifyCohortFutures(proxy, IllegalArgumentException.class);
725     }
726
727     @Test
728     public void testGetIdentifier() {
729         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
730         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
731                 TransactionProxy.TransactionType.READ_ONLY);
732
733         Object id = transactionProxy.getIdentifier();
734         assertNotNull("getIdentifier returned null", id);
735         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
736     }
737
738     @Test
739     public void testClose() throws Exception{
740         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
741
742         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
743                 eq(actorSelection(actorRef)), eqSerializedReadData());
744
745         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
746
747         transactionProxy.read(TestModel.TEST_PATH);
748
749         transactionProxy.close();
750
751         verify(mockActorContext).sendOperationAsync(
752                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
753     }
754
755
756     /**
757      * Method to test a local Tx actor. The Tx paths are matched to decide if the
758      * Tx actor is local or not. This is done by mocking the Tx actor path
759      * and the caller paths and ensuring that the paths have the remote-address format
760      *
761      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
762      * the paths returned for the actors for all the tests are not qualified remote paths.
763      * Hence are treated as non-local/remote actors. In short, all tests except
764      * few below run for remote actors
765      *
766      * @throws Exception
767      */
768     @Test
769     public void testLocalTxActorRead() throws Exception {
770         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
771         doReturn(true).when(mockActorContext).isPathLocal(anyString());
772
773         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
774
775         // negative test case with null as the reply
776         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
777             any(ActorSelection.class), eqReadData());
778
779         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
780             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
781
782         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
783
784         // test case with node as read data reply
785         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
786
787         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
788             any(ActorSelection.class), eqReadData());
789
790         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
791
792         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
793
794         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
795
796         // test for local data exists
797         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
798             any(ActorSelection.class), eqDataExists());
799
800         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
801
802         assertEquals("Exists response", true, exists);
803     }
804
805     @Test
806     public void testLocalTxActorReady() throws Exception {
807         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
808         doReturn(true).when(mockActorContext).isPathLocal(anyString());
809
810         doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
811                 any(ActorSelection.class), isA(BatchedModifications.class));
812
813         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
814
815         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
816         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
817
818         // testing ready
819         doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
820             eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
821
822         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
823
824         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
825
826         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
827
828         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
829     }
830
831     private static interface TransactionProxyOperation {
832         void run(TransactionProxy transactionProxy);
833     }
834
835     private void throttleOperation(TransactionProxyOperation operation) {
836         throttleOperation(operation, 1, true);
837     }
838
839     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
840         ActorSystem actorSystem = getSystem();
841         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
842
843         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
844
845         doReturn(actorSystem.actorSelection(shardActorRef.path())).
846                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
847
848         if(shardFound) {
849             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
850                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
851         } else {
852             doReturn(Futures.failed(new Exception("not found")))
853                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
854         }
855
856         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
857         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
858                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
859                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
860
861         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
862                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
863                         eqCreateTransaction(memberName, READ_WRITE));
864
865         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
866
867         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
868
869         long start = System.nanoTime();
870
871         operation.run(transactionProxy);
872
873         long end = System.nanoTime();
874
875         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
876         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
877                 expected, (end-start)), (end - start) > expected);
878
879     }
880
881     private void completeOperation(TransactionProxyOperation operation){
882         completeOperation(operation, true);
883     }
884
885     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
886         ActorSystem actorSystem = getSystem();
887         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
888
889         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
890
891         doReturn(actorSystem.actorSelection(shardActorRef.path())).
892                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
893
894         if(shardFound) {
895             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
896                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
897         } else {
898             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
899                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
900         }
901
902         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
903         String actorPath = txActorRef.path().toString();
904         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
905                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
906                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
907
908         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
909
910         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
911                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
912                         eqCreateTransaction(memberName, READ_WRITE));
913
914         doReturn(true).when(mockActorContext).isPathLocal(anyString());
915
916         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
917
918         long start = System.nanoTime();
919
920         operation.run(transactionProxy);
921
922         long end = System.nanoTime();
923
924         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
925         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
926                 expected, (end-start)), (end - start) <= expected);
927     }
928
929     public void testWriteThrottling(boolean shardFound){
930
931         throttleOperation(new TransactionProxyOperation() {
932             @Override
933             public void run(TransactionProxy transactionProxy) {
934                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
935
936                 expectBatchedModifications(2);
937
938                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
939
940                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
941             }
942         }, 1, shardFound);
943     }
944
945     @Test
946     public void testWriteThrottlingWhenShardFound(){
947         dataStoreContextBuilder.shardBatchedModificationCount(1);
948         throttleOperation(new TransactionProxyOperation() {
949             @Override
950             public void run(TransactionProxy transactionProxy) {
951                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
952
953                 expectIncompleteBatchedModifications();
954
955                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
956
957                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
958             }
959         });
960     }
961
962     @Test
963     public void testWriteThrottlingWhenShardNotFound(){
964         // Confirm that there is no throttling when the Shard is not found
965         dataStoreContextBuilder.shardBatchedModificationCount(1);
966         completeOperation(new TransactionProxyOperation() {
967             @Override
968             public void run(TransactionProxy transactionProxy) {
969                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
970
971                 expectBatchedModifications(2);
972
973                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
974
975                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
976             }
977         }, false);
978
979     }
980
981
982     @Test
983     public void testWriteCompletion(){
984         dataStoreContextBuilder.shardBatchedModificationCount(1);
985         completeOperation(new TransactionProxyOperation() {
986             @Override
987             public void run(TransactionProxy transactionProxy) {
988                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
989
990                 expectBatchedModifications(2);
991
992                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
993
994                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
995             }
996         });
997     }
998
999     @Test
1000     public void testMergeThrottlingWhenShardFound(){
1001         dataStoreContextBuilder.shardBatchedModificationCount(1);
1002         throttleOperation(new TransactionProxyOperation() {
1003             @Override
1004             public void run(TransactionProxy transactionProxy) {
1005                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1006
1007                 expectIncompleteBatchedModifications();
1008
1009                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1010
1011                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1012             }
1013         });
1014     }
1015
1016     @Test
1017     public void testMergeThrottlingWhenShardNotFound(){
1018         dataStoreContextBuilder.shardBatchedModificationCount(1);
1019         completeOperation(new TransactionProxyOperation() {
1020             @Override
1021             public void run(TransactionProxy transactionProxy) {
1022                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1023
1024                 expectBatchedModifications(2);
1025
1026                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1027
1028                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1029             }
1030         }, false);
1031     }
1032
1033     @Test
1034     public void testMergeCompletion(){
1035         dataStoreContextBuilder.shardBatchedModificationCount(1);
1036         completeOperation(new TransactionProxyOperation() {
1037             @Override
1038             public void run(TransactionProxy transactionProxy) {
1039                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1040
1041                 expectBatchedModifications(2);
1042
1043                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1044
1045                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1046             }
1047         });
1048
1049     }
1050
1051     @Test
1052     public void testDeleteThrottlingWhenShardFound(){
1053
1054         throttleOperation(new TransactionProxyOperation() {
1055             @Override
1056             public void run(TransactionProxy transactionProxy) {
1057                 expectIncompleteBatchedModifications();
1058
1059                 transactionProxy.delete(TestModel.TEST_PATH);
1060
1061                 transactionProxy.delete(TestModel.TEST_PATH);
1062             }
1063         });
1064     }
1065
1066
1067     @Test
1068     public void testDeleteThrottlingWhenShardNotFound(){
1069
1070         completeOperation(new TransactionProxyOperation() {
1071             @Override
1072             public void run(TransactionProxy transactionProxy) {
1073                 expectBatchedModifications(2);
1074
1075                 transactionProxy.delete(TestModel.TEST_PATH);
1076
1077                 transactionProxy.delete(TestModel.TEST_PATH);
1078             }
1079         }, false);
1080     }
1081
1082     @Test
1083     public void testDeleteCompletion(){
1084         dataStoreContextBuilder.shardBatchedModificationCount(1);
1085         completeOperation(new TransactionProxyOperation() {
1086             @Override
1087             public void run(TransactionProxy transactionProxy) {
1088                 expectBatchedModifications(2);
1089
1090                 transactionProxy.delete(TestModel.TEST_PATH);
1091
1092                 transactionProxy.delete(TestModel.TEST_PATH);
1093             }
1094         });
1095
1096     }
1097
1098     @Test
1099     public void testReadThrottlingWhenShardFound(){
1100
1101         throttleOperation(new TransactionProxyOperation() {
1102             @Override
1103             public void run(TransactionProxy transactionProxy) {
1104                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1105                         any(ActorSelection.class), eqReadData());
1106
1107                 transactionProxy.read(TestModel.TEST_PATH);
1108
1109                 transactionProxy.read(TestModel.TEST_PATH);
1110             }
1111         });
1112     }
1113
1114     @Test
1115     public void testReadThrottlingWhenShardNotFound(){
1116
1117         completeOperation(new TransactionProxyOperation() {
1118             @Override
1119             public void run(TransactionProxy transactionProxy) {
1120                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1121                         any(ActorSelection.class), eqReadData());
1122
1123                 transactionProxy.read(TestModel.TEST_PATH);
1124
1125                 transactionProxy.read(TestModel.TEST_PATH);
1126             }
1127         }, false);
1128     }
1129
1130
1131     @Test
1132     public void testReadCompletion(){
1133         completeOperation(new TransactionProxyOperation() {
1134             @Override
1135             public void run(TransactionProxy transactionProxy) {
1136                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1137
1138                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1139                         any(ActorSelection.class), eqReadData());
1140
1141                 transactionProxy.read(TestModel.TEST_PATH);
1142
1143                 transactionProxy.read(TestModel.TEST_PATH);
1144             }
1145         });
1146
1147     }
1148
1149     @Test
1150     public void testExistsThrottlingWhenShardFound(){
1151
1152         throttleOperation(new TransactionProxyOperation() {
1153             @Override
1154             public void run(TransactionProxy transactionProxy) {
1155                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1156                         any(ActorSelection.class), eqDataExists());
1157
1158                 transactionProxy.exists(TestModel.TEST_PATH);
1159
1160                 transactionProxy.exists(TestModel.TEST_PATH);
1161             }
1162         });
1163     }
1164
1165     @Test
1166     public void testExistsThrottlingWhenShardNotFound(){
1167
1168         completeOperation(new TransactionProxyOperation() {
1169             @Override
1170             public void run(TransactionProxy transactionProxy) {
1171                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1172                         any(ActorSelection.class), eqDataExists());
1173
1174                 transactionProxy.exists(TestModel.TEST_PATH);
1175
1176                 transactionProxy.exists(TestModel.TEST_PATH);
1177             }
1178         }, false);
1179     }
1180
1181
1182     @Test
1183     public void testExistsCompletion(){
1184         completeOperation(new TransactionProxyOperation() {
1185             @Override
1186             public void run(TransactionProxy transactionProxy) {
1187                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1188                         any(ActorSelection.class), eqDataExists());
1189
1190                 transactionProxy.exists(TestModel.TEST_PATH);
1191
1192                 transactionProxy.exists(TestModel.TEST_PATH);
1193             }
1194         });
1195
1196     }
1197
1198     @Test
1199     public void testReadyThrottling(){
1200
1201         throttleOperation(new TransactionProxyOperation() {
1202             @Override
1203             public void run(TransactionProxy transactionProxy) {
1204                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1205
1206                 expectBatchedModifications(1);
1207
1208                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1209                         any(ActorSelection.class), any(ReadyTransaction.class));
1210
1211                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1212
1213                 transactionProxy.ready();
1214             }
1215         });
1216     }
1217
1218     @Test
1219     public void testReadyThrottlingWithTwoTransactionContexts(){
1220
1221         throttleOperation(new TransactionProxyOperation() {
1222             @Override
1223             public void run(TransactionProxy transactionProxy) {
1224                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1225                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1226
1227                 expectBatchedModifications(2);
1228
1229                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1230                         any(ActorSelection.class), any(ReadyTransaction.class));
1231
1232                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1233
1234                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1235
1236                 transactionProxy.ready();
1237             }
1238         }, 2, true);
1239     }
1240
1241     private void testModificationOperationBatching(TransactionType type) throws Exception {
1242         int shardBatchedModificationCount = 3;
1243         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1244
1245         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1246
1247         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1248
1249         expectReadyTransaction(actorRef);
1250
1251         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1252         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1253
1254         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1255         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1256
1257         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1258         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1259
1260         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1261         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1262
1263         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1264         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1265
1266         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1267         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1268
1269         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1270         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1271
1272         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1273
1274         transactionProxy.write(writePath1, writeNode1);
1275         transactionProxy.write(writePath2, writeNode2);
1276         transactionProxy.delete(deletePath1);
1277         transactionProxy.merge(mergePath1, mergeNode1);
1278         transactionProxy.merge(mergePath2, mergeNode2);
1279         transactionProxy.write(writePath3, writeNode3);
1280         transactionProxy.merge(mergePath3, mergeNode3);
1281         transactionProxy.delete(deletePath2);
1282
1283         // This sends the last batch.
1284         transactionProxy.ready();
1285
1286         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1287         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1288
1289         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1290                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1291
1292         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1293                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1294
1295         boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
1296         verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
1297                 new DeleteModification(deletePath2));
1298
1299         if(optimizedWriteOnly) {
1300             verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1301                     BatchedModificationsReply.class, BatchedModificationsReply.class);
1302         } else {
1303             verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1304                     BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1305         }
1306     }
1307
1308     @Test
1309     public void testReadWriteModificationOperationBatching() throws Throwable {
1310         testModificationOperationBatching(READ_WRITE);
1311     }
1312
1313     @Test
1314     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1315         testModificationOperationBatching(WRITE_ONLY);
1316     }
1317
1318     @Test
1319     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1320         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1321         testModificationOperationBatching(WRITE_ONLY);
1322     }
1323
1324     @Test
1325     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1326
1327         int shardBatchedModificationCount = 10;
1328         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1329
1330         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1331
1332         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1333
1334         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1335         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1336
1337         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1338         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1339
1340         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1341         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1342
1343         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1344         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1345
1346         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1347
1348         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1349                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1350
1351         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1352                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1353
1354         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1355                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1356
1357         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1358
1359         transactionProxy.write(writePath1, writeNode1);
1360         transactionProxy.write(writePath2, writeNode2);
1361
1362         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1363                 get(5, TimeUnit.SECONDS);
1364
1365         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1366         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1367
1368         transactionProxy.merge(mergePath1, mergeNode1);
1369         transactionProxy.merge(mergePath2, mergeNode2);
1370
1371         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1372
1373         transactionProxy.delete(deletePath);
1374
1375         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1376         assertEquals("Exists response", true, exists);
1377
1378         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1379         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1380
1381         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1382         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1383
1384         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1385                 new WriteModification(writePath2, writeNode2));
1386
1387         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1388                 new MergeModification(mergePath2, mergeNode2));
1389
1390         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1391
1392         InOrder inOrder = Mockito.inOrder(mockActorContext);
1393         inOrder.verify(mockActorContext).executeOperationAsync(
1394                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1395
1396         inOrder.verify(mockActorContext).executeOperationAsync(
1397                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1398
1399         inOrder.verify(mockActorContext).executeOperationAsync(
1400                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1401
1402         inOrder.verify(mockActorContext).executeOperationAsync(
1403                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1404
1405         inOrder.verify(mockActorContext).executeOperationAsync(
1406                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1407
1408         inOrder.verify(mockActorContext).executeOperationAsync(
1409                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1410
1411         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1412                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1413     }
1414 }