CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.anyString;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Matchers.isA;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
46 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
47 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
48 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
49 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
50 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
51 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
52 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
53 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
57 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
62 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import scala.concurrent.Promise;
65
66 @SuppressWarnings("resource")
67 public class TransactionProxyTest extends AbstractTransactionProxyTest {
68
69     @SuppressWarnings("serial")
70     static class TestException extends RuntimeException {
71     }
72
73     static interface Invoker {
74         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
75     }
76
77     @Test
78     public void testRead() throws Exception {
79         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
80
81         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
82
83         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
84                 eq(actorSelection(actorRef)), eqSerializedReadData());
85
86         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
87                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
88
89         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
90
91         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
92
93         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
94                 eq(actorSelection(actorRef)), eqSerializedReadData());
95
96         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
97
98         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
99
100         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
101     }
102
103     @Test(expected = ReadFailedException.class)
104     public void testReadWithInvalidReplyMessageType() throws Exception {
105         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
106
107         doReturn(Futures.successful(new Object())).when(mockActorContext).
108                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
109
110         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
111
112         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
113     }
114
115     @Test(expected = TestException.class)
116     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
117         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
118
119         doReturn(Futures.failed(new TestException())).when(mockActorContext).
120                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
121
122         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
123
124         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
125     }
126
127     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
128             throws Throwable {
129         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
130
131         if (exToThrow instanceof PrimaryNotFoundException) {
132             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
133         } else {
134             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
135                     when(mockActorContext).findPrimaryShardAsync(anyString());
136         }
137
138         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
139                 any(ActorSelection.class), any());
140
141         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
142
143         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
144     }
145
146     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
147         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
148             @Override
149             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
150                 return proxy.read(TestModel.TEST_PATH);
151             }
152         });
153     }
154
155     @Test(expected = PrimaryNotFoundException.class)
156     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
157         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
158     }
159
160     @Test(expected = TimeoutException.class)
161     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
162         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
163                 new Exception("reason")));
164     }
165
166     @Test(expected = TestException.class)
167     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
168         testReadWithExceptionOnInitialCreateTransaction(new TestException());
169     }
170
171     @Test
172     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
173         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
174
175         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
176
177         expectBatchedModifications(actorRef, 1);
178
179         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
180                 eq(actorSelection(actorRef)), eqSerializedReadData());
181
182         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
183
184         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
185
186         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
187                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
188
189         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
190         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
191
192         InOrder inOrder = Mockito.inOrder(mockActorContext);
193         inOrder.verify(mockActorContext).executeOperationAsync(
194                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
195
196         inOrder.verify(mockActorContext).executeOperationAsync(
197                 eq(actorSelection(actorRef)), eqSerializedReadData());
198     }
199
200     @Test(expected=IllegalStateException.class)
201     public void testReadPreConditionCheck() {
202         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
203         transactionProxy.read(TestModel.TEST_PATH);
204     }
205
206     @Test(expected=IllegalArgumentException.class)
207     public void testInvalidCreateTransactionReply() throws Throwable {
208         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
209
210         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
211             actorSelection(actorRef.path().toString());
212
213         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
214             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
215
216         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
217             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
218
219         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
220
221         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
222     }
223
224     @Test
225     public void testExists() throws Exception {
226         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
227
228         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
229
230         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
231                 eq(actorSelection(actorRef)), eqSerializedDataExists());
232
233         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
234
235         assertEquals("Exists response", false, exists);
236
237         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
238                 eq(actorSelection(actorRef)), eqSerializedDataExists());
239
240         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
241
242         assertEquals("Exists response", true, exists);
243     }
244
245     @Test(expected = PrimaryNotFoundException.class)
246     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
247         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
248             @Override
249             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
250                 return proxy.exists(TestModel.TEST_PATH);
251             }
252         });
253     }
254
255     @Test(expected = ReadFailedException.class)
256     public void testExistsWithInvalidReplyMessageType() throws Exception {
257         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
258
259         doReturn(Futures.successful(new Object())).when(mockActorContext).
260                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
261
262         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
263
264         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
265     }
266
267     @Test(expected = TestException.class)
268     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
269         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
270
271         doReturn(Futures.failed(new TestException())).when(mockActorContext).
272                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
273
274         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
275
276         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
277     }
278
279     @Test
280     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
281         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
282
283         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
284
285         expectBatchedModifications(actorRef, 1);
286
287         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
288                 eq(actorSelection(actorRef)), eqSerializedDataExists());
289
290         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
291
292         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
293
294         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
295
296         assertEquals("Exists response", true, exists);
297
298         InOrder inOrder = Mockito.inOrder(mockActorContext);
299         inOrder.verify(mockActorContext).executeOperationAsync(
300                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
301
302         inOrder.verify(mockActorContext).executeOperationAsync(
303                 eq(actorSelection(actorRef)), eqSerializedDataExists());
304     }
305
306     @Test(expected=IllegalStateException.class)
307     public void testExistsPreConditionCheck() {
308         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
309         transactionProxy.exists(TestModel.TEST_PATH);
310     }
311
312     @Test
313     public void testWrite() throws Exception {
314         dataStoreContextBuilder.shardBatchedModificationCount(1);
315         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
316
317         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
318
319         expectBatchedModifications(actorRef, 1);
320
321         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
322
323         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
324
325         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
326     }
327
328     @Test
329     public void testWriteAfterAsyncRead() throws Throwable {
330         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
331
332         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
333         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
334                 eq(getSystem().actorSelection(actorRef.path())),
335                 eqCreateTransaction(memberName, READ_WRITE));
336
337         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
338                 eq(actorSelection(actorRef)), eqSerializedReadData());
339
340         expectBatchedModificationsReady(actorRef);
341
342         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
343
344         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
345
346         final CountDownLatch readComplete = new CountDownLatch(1);
347         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
348         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
349                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
350                     @Override
351                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
352                         try {
353                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
354                         } catch (Exception e) {
355                             caughtEx.set(e);
356                         } finally {
357                             readComplete.countDown();
358                         }
359                     }
360
361                     @Override
362                     public void onFailure(Throwable t) {
363                         caughtEx.set(t);
364                         readComplete.countDown();
365                     }
366                 });
367
368         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
369
370         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
371
372         if(caughtEx.get() != null) {
373             throw caughtEx.get();
374         }
375
376         // This sends the batched modification.
377         transactionProxy.ready();
378
379         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
380     }
381
382     @Test(expected=IllegalStateException.class)
383     public void testWritePreConditionCheck() {
384         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
385         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
386     }
387
388     @Test(expected=IllegalStateException.class)
389     public void testWriteAfterReadyPreConditionCheck() {
390         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
391
392         transactionProxy.ready();
393
394         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
395     }
396
397     @Test
398     public void testMerge() throws Exception {
399         dataStoreContextBuilder.shardBatchedModificationCount(1);
400         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
401
402         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
403
404         expectBatchedModifications(actorRef, 1);
405
406         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
407
408         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
409
410         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
411     }
412
413     @Test
414     public void testDelete() throws Exception {
415         dataStoreContextBuilder.shardBatchedModificationCount(1);
416         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
417
418         expectBatchedModifications(actorRef, 1);
419
420         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
421
422         transactionProxy.delete(TestModel.TEST_PATH);
423
424         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
425     }
426
427     @Test
428     public void testReadWrite() throws Exception {
429         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
430
431         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
432
433         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
434                 eq(actorSelection(actorRef)), eqSerializedReadData());
435
436         expectBatchedModifications(actorRef, 1);
437
438         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
439
440         transactionProxy.read(TestModel.TEST_PATH);
441
442         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
443
444         transactionProxy.read(TestModel.TEST_PATH);
445
446         transactionProxy.read(TestModel.TEST_PATH);
447
448         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
449         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
450
451         verifyBatchedModifications(batchedModifications.get(0), false,
452                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
453     }
454
455     @Test
456     public void testReadyWithReadWrite() throws Exception {
457         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
458
459         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
460
461         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
462                 eq(actorSelection(actorRef)), eqSerializedReadData());
463
464         expectBatchedModificationsReady(actorRef, true);
465
466         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
467
468         transactionProxy.read(TestModel.TEST_PATH);
469
470         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
471
472         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
473
474         assertTrue(ready instanceof SingleCommitCohortProxy);
475
476         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
477
478         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
479         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
480
481         verifyBatchedModifications(batchedModifications.get(0), true, true,
482                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
483
484         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
485     }
486
487     @Test
488     public void testReadyWithNoModifications() throws Exception {
489         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
490
491         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
492                 eq(actorSelection(actorRef)), eqSerializedReadData());
493
494         expectBatchedModificationsReady(actorRef, true);
495
496         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
497
498         transactionProxy.read(TestModel.TEST_PATH);
499
500         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
501
502         assertTrue(ready instanceof SingleCommitCohortProxy);
503
504         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
505
506         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
507         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
508
509         verifyBatchedModifications(batchedModifications.get(0), true, true);
510     }
511
512     @Test
513     public void testReadyWithMultipleShardWrites() throws Exception {
514         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
515
516         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
517
518         expectBatchedModificationsReady(actorRef1);
519         expectBatchedModificationsReady(actorRef2);
520
521         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
522
523         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
524         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
525
526         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
527
528         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
529
530         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
531                 actorSelection(actorRef2));
532     }
533
534     @Test
535     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
536         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
537
538         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
539
540         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
541
542         expectBatchedModificationsReady(actorRef, true);
543
544         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
545
546         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
547
548         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
549
550         assertTrue(ready instanceof SingleCommitCohortProxy);
551
552         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
553
554         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
555         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
556
557         verifyBatchedModifications(batchedModifications.get(0), true, true,
558                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
559
560         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
561                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
562     }
563
564     @Test
565     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
566         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
567         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
568
569         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
570
571         expectBatchedModificationsReady(actorRef, true);
572
573         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
574
575         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
576
577         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
578
579         assertTrue(ready instanceof SingleCommitCohortProxy);
580
581         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
582
583         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
584         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
585
586         verifyBatchedModifications(batchedModifications.get(0), false,
587                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
588
589         verifyBatchedModifications(batchedModifications.get(1), true, true);
590
591         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
592                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
593     }
594
595     @Test
596     public void testReadyWithReplyFailure() throws Exception {
597         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
598
599         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
600
601         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
602
603         expectFailedBatchedModifications(actorRef);
604
605         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
606
607         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
608
609         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
610
611         assertTrue(ready instanceof SingleCommitCohortProxy);
612
613         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
614     }
615
616     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
617         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
618
619         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
620
621         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
622
623         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
624
625         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
626
627         transactionProxy.delete(TestModel.TEST_PATH);
628
629         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
630
631         assertTrue(ready instanceof SingleCommitCohortProxy);
632
633         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
634     }
635
636     @Test
637     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
638         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
639     }
640
641     @Test
642     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
643         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
644     }
645
646     @Test
647     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
648         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
649     }
650
651     @Test
652     public void testReadyWithInvalidReplyMessageType() throws Exception {
653         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
654         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
655
656         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
657
658         doReturn(Futures.successful(new Object())).when(mockActorContext).
659                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
660
661         expectBatchedModificationsReady(actorRef2);
662
663         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
664
665         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
666         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
667
668         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
669
670         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
671
672         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
673                 IllegalArgumentException.class);
674     }
675
676     @Test
677     public void testGetIdentifier() {
678         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
679         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
680
681         Object id = transactionProxy.getIdentifier();
682         assertNotNull("getIdentifier returned null", id);
683         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
684     }
685
686     @Test
687     public void testClose() throws Exception{
688         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
689
690         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
691                 eq(actorSelection(actorRef)), eqSerializedReadData());
692
693         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
694
695         transactionProxy.read(TestModel.TEST_PATH);
696
697         transactionProxy.close();
698
699         verify(mockActorContext).sendOperationAsync(
700                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
701     }
702
703
704     /**
705      * Method to test a local Tx actor. The Tx paths are matched to decide if the
706      * Tx actor is local or not. This is done by mocking the Tx actor path
707      * and the caller paths and ensuring that the paths have the remote-address format
708      *
709      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
710      * the paths returned for the actors for all the tests are not qualified remote paths.
711      * Hence are treated as non-local/remote actors. In short, all tests except
712      * few below run for remote actors
713      *
714      * @throws Exception
715      */
716     @Test
717     public void testLocalTxActorRead() throws Exception {
718         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
719         doReturn(true).when(mockActorContext).isPathLocal(anyString());
720
721         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
722
723         // negative test case with null as the reply
724         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
725             any(ActorSelection.class), eqReadData());
726
727         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
728             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
729
730         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
731
732         // test case with node as read data reply
733         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
734
735         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
736             any(ActorSelection.class), eqReadData());
737
738         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
739
740         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
741
742         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
743
744         // test for local data exists
745         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
746             any(ActorSelection.class), eqDataExists());
747
748         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
749
750         assertEquals("Exists response", true, exists);
751     }
752
753     @Test
754     public void testLocalTxActorReady() throws Exception {
755         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
756         doReturn(true).when(mockActorContext).isPathLocal(anyString());
757
758         expectBatchedModificationsReady(actorRef, true);
759
760         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
761
762         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
763         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
764
765         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
766
767         assertTrue(ready instanceof SingleCommitCohortProxy);
768
769         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
770     }
771
772     private static interface TransactionProxyOperation {
773         void run(TransactionProxy transactionProxy);
774     }
775
776     private void throttleOperation(TransactionProxyOperation operation) {
777         throttleOperation(operation, 1, true);
778     }
779
780     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
781         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
782     }
783
784     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
785         ActorSystem actorSystem = getSystem();
786         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
787
788         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
789
790         doReturn(actorSystem.actorSelection(shardActorRef.path())).
791                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
792
793         if(shardFound) {
794             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
795                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
796         } else {
797             doReturn(Futures.failed(new Exception("not found")))
798                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
799         }
800
801         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
802         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
803                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
804                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
805
806         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
807                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
808                         eqCreateTransaction(memberName, READ_WRITE));
809
810         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
811
812         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
813
814         long start = System.nanoTime();
815
816         operation.run(transactionProxy);
817
818         long end = System.nanoTime();
819
820         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
821         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
822                 expected, (end-start)), (end - start) > expected);
823
824     }
825
826     private void completeOperation(TransactionProxyOperation operation){
827         completeOperation(operation, true);
828     }
829
830     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
831         ActorSystem actorSystem = getSystem();
832         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
833
834         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
835
836         doReturn(actorSystem.actorSelection(shardActorRef.path())).
837                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
838
839         if(shardFound) {
840             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
841                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
842         } else {
843             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
844                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
845         }
846
847         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
848         String actorPath = txActorRef.path().toString();
849         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
850                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
851                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
852
853         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
854
855         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
856                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
857                         eqCreateTransaction(memberName, READ_WRITE));
858
859         doReturn(true).when(mockActorContext).isPathLocal(anyString());
860
861         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
862
863         long start = System.nanoTime();
864
865         operation.run(transactionProxy);
866
867         long end = System.nanoTime();
868
869         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
870         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
871                 expected, (end-start)), (end - start) <= expected);
872     }
873
874     @Test
875     public void testWriteThrottlingWhenShardFound(){
876         dataStoreContextBuilder.shardBatchedModificationCount(1);
877         throttleOperation(new TransactionProxyOperation() {
878             @Override
879             public void run(TransactionProxy transactionProxy) {
880                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
881
882                 expectIncompleteBatchedModifications();
883
884                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
885
886                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
887             }
888         });
889     }
890
891     @Test
892     public void testWriteThrottlingWhenShardNotFound(){
893         // Confirm that there is no throttling when the Shard is not found
894         dataStoreContextBuilder.shardBatchedModificationCount(1);
895         completeOperation(new TransactionProxyOperation() {
896             @Override
897             public void run(TransactionProxy transactionProxy) {
898                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
899
900                 expectBatchedModifications(2);
901
902                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
903
904                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
905             }
906         }, false);
907
908     }
909
910
911     @Test
912     public void testWriteCompletion(){
913         dataStoreContextBuilder.shardBatchedModificationCount(1);
914         completeOperation(new TransactionProxyOperation() {
915             @Override
916             public void run(TransactionProxy transactionProxy) {
917                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
918
919                 expectBatchedModifications(2);
920
921                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
922
923                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
924             }
925         });
926     }
927
928     @Test
929     public void testMergeThrottlingWhenShardFound(){
930         dataStoreContextBuilder.shardBatchedModificationCount(1);
931         throttleOperation(new TransactionProxyOperation() {
932             @Override
933             public void run(TransactionProxy transactionProxy) {
934                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
935
936                 expectIncompleteBatchedModifications();
937
938                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
939
940                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
941             }
942         });
943     }
944
945     @Test
946     public void testMergeThrottlingWhenShardNotFound(){
947         dataStoreContextBuilder.shardBatchedModificationCount(1);
948         completeOperation(new TransactionProxyOperation() {
949             @Override
950             public void run(TransactionProxy transactionProxy) {
951                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
952
953                 expectBatchedModifications(2);
954
955                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
956
957                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
958             }
959         }, false);
960     }
961
962     @Test
963     public void testMergeCompletion(){
964         dataStoreContextBuilder.shardBatchedModificationCount(1);
965         completeOperation(new TransactionProxyOperation() {
966             @Override
967             public void run(TransactionProxy transactionProxy) {
968                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
969
970                 expectBatchedModifications(2);
971
972                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
973
974                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
975             }
976         });
977
978     }
979
980     @Test
981     public void testDeleteThrottlingWhenShardFound(){
982
983         throttleOperation(new TransactionProxyOperation() {
984             @Override
985             public void run(TransactionProxy transactionProxy) {
986                 expectIncompleteBatchedModifications();
987
988                 transactionProxy.delete(TestModel.TEST_PATH);
989
990                 transactionProxy.delete(TestModel.TEST_PATH);
991             }
992         });
993     }
994
995
996     @Test
997     public void testDeleteThrottlingWhenShardNotFound(){
998
999         completeOperation(new TransactionProxyOperation() {
1000             @Override
1001             public void run(TransactionProxy transactionProxy) {
1002                 expectBatchedModifications(2);
1003
1004                 transactionProxy.delete(TestModel.TEST_PATH);
1005
1006                 transactionProxy.delete(TestModel.TEST_PATH);
1007             }
1008         }, false);
1009     }
1010
1011     @Test
1012     public void testDeleteCompletion(){
1013         dataStoreContextBuilder.shardBatchedModificationCount(1);
1014         completeOperation(new TransactionProxyOperation() {
1015             @Override
1016             public void run(TransactionProxy transactionProxy) {
1017                 expectBatchedModifications(2);
1018
1019                 transactionProxy.delete(TestModel.TEST_PATH);
1020
1021                 transactionProxy.delete(TestModel.TEST_PATH);
1022             }
1023         });
1024
1025     }
1026
1027     @Test
1028     public void testReadThrottlingWhenShardFound(){
1029
1030         throttleOperation(new TransactionProxyOperation() {
1031             @Override
1032             public void run(TransactionProxy transactionProxy) {
1033                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1034                         any(ActorSelection.class), eqReadData());
1035
1036                 transactionProxy.read(TestModel.TEST_PATH);
1037
1038                 transactionProxy.read(TestModel.TEST_PATH);
1039             }
1040         });
1041     }
1042
1043     @Test
1044     public void testReadThrottlingWhenShardNotFound(){
1045
1046         completeOperation(new TransactionProxyOperation() {
1047             @Override
1048             public void run(TransactionProxy transactionProxy) {
1049                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1050                         any(ActorSelection.class), eqReadData());
1051
1052                 transactionProxy.read(TestModel.TEST_PATH);
1053
1054                 transactionProxy.read(TestModel.TEST_PATH);
1055             }
1056         }, false);
1057     }
1058
1059
1060     @Test
1061     public void testReadCompletion(){
1062         completeOperation(new TransactionProxyOperation() {
1063             @Override
1064             public void run(TransactionProxy transactionProxy) {
1065                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1066
1067                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1068                         any(ActorSelection.class), eqReadData());
1069
1070                 transactionProxy.read(TestModel.TEST_PATH);
1071
1072                 transactionProxy.read(TestModel.TEST_PATH);
1073             }
1074         });
1075
1076     }
1077
1078     @Test
1079     public void testExistsThrottlingWhenShardFound(){
1080
1081         throttleOperation(new TransactionProxyOperation() {
1082             @Override
1083             public void run(TransactionProxy transactionProxy) {
1084                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1085                         any(ActorSelection.class), eqDataExists());
1086
1087                 transactionProxy.exists(TestModel.TEST_PATH);
1088
1089                 transactionProxy.exists(TestModel.TEST_PATH);
1090             }
1091         });
1092     }
1093
1094     @Test
1095     public void testExistsThrottlingWhenShardNotFound(){
1096
1097         completeOperation(new TransactionProxyOperation() {
1098             @Override
1099             public void run(TransactionProxy transactionProxy) {
1100                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1101                         any(ActorSelection.class), eqDataExists());
1102
1103                 transactionProxy.exists(TestModel.TEST_PATH);
1104
1105                 transactionProxy.exists(TestModel.TEST_PATH);
1106             }
1107         }, false);
1108     }
1109
1110
1111     @Test
1112     public void testExistsCompletion(){
1113         completeOperation(new TransactionProxyOperation() {
1114             @Override
1115             public void run(TransactionProxy transactionProxy) {
1116                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1117                         any(ActorSelection.class), eqDataExists());
1118
1119                 transactionProxy.exists(TestModel.TEST_PATH);
1120
1121                 transactionProxy.exists(TestModel.TEST_PATH);
1122             }
1123         });
1124
1125     }
1126
1127     @Test
1128     public void testReadyThrottling(){
1129
1130         throttleOperation(new TransactionProxyOperation() {
1131             @Override
1132             public void run(TransactionProxy transactionProxy) {
1133                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1134
1135                 expectBatchedModifications(1);
1136
1137                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1138                         any(ActorSelection.class), any(ReadyTransaction.class));
1139
1140                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1141
1142                 transactionProxy.ready();
1143             }
1144         });
1145     }
1146
1147     @Test
1148     public void testReadyThrottlingWithTwoTransactionContexts(){
1149
1150         throttleOperation(new TransactionProxyOperation() {
1151             @Override
1152             public void run(TransactionProxy transactionProxy) {
1153                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1154                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1155
1156                 expectBatchedModifications(2);
1157
1158                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1159                         any(ActorSelection.class), any(ReadyTransaction.class));
1160
1161                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1162
1163                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1164
1165                 transactionProxy.ready();
1166             }
1167         }, 2, true);
1168     }
1169
1170     private void testModificationOperationBatching(TransactionType type) throws Exception {
1171         int shardBatchedModificationCount = 3;
1172         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1173
1174         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1175
1176         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1177
1178         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1179         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1180
1181         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1182         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1183
1184         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1185         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1186
1187         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1188         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1189
1190         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1191         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1192
1193         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1194         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1195
1196         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1197         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1198
1199         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1200
1201         transactionProxy.write(writePath1, writeNode1);
1202         transactionProxy.write(writePath2, writeNode2);
1203         transactionProxy.delete(deletePath1);
1204         transactionProxy.merge(mergePath1, mergeNode1);
1205         transactionProxy.merge(mergePath2, mergeNode2);
1206         transactionProxy.write(writePath3, writeNode3);
1207         transactionProxy.merge(mergePath3, mergeNode3);
1208         transactionProxy.delete(deletePath2);
1209
1210         // This sends the last batch.
1211         transactionProxy.ready();
1212
1213         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1214         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1215
1216         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1217                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1218
1219         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1220                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1221
1222         verifyBatchedModifications(batchedModifications.get(2), true, true,
1223                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1224
1225         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1226     }
1227
1228     @Test
1229     public void testReadWriteModificationOperationBatching() throws Throwable {
1230         testModificationOperationBatching(READ_WRITE);
1231     }
1232
1233     @Test
1234     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1235         testModificationOperationBatching(WRITE_ONLY);
1236     }
1237
1238     @Test
1239     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1240         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1241         testModificationOperationBatching(WRITE_ONLY);
1242     }
1243
1244     @Test
1245     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1246
1247         int shardBatchedModificationCount = 10;
1248         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1249
1250         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1251
1252         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1253
1254         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1255         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1256
1257         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1258         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1259
1260         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1261         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1262
1263         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1264         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1265
1266         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1267
1268         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1269                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1270
1271         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1272                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1273
1274         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1275                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1276
1277         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1278
1279         transactionProxy.write(writePath1, writeNode1);
1280         transactionProxy.write(writePath2, writeNode2);
1281
1282         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1283                 get(5, TimeUnit.SECONDS);
1284
1285         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1286         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1287
1288         transactionProxy.merge(mergePath1, mergeNode1);
1289         transactionProxy.merge(mergePath2, mergeNode2);
1290
1291         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1292
1293         transactionProxy.delete(deletePath);
1294
1295         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1296         assertEquals("Exists response", true, exists);
1297
1298         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1299         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1300
1301         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1302         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1303
1304         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1305                 new WriteModification(writePath2, writeNode2));
1306
1307         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1308                 new MergeModification(mergePath2, mergeNode2));
1309
1310         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1311
1312         InOrder inOrder = Mockito.inOrder(mockActorContext);
1313         inOrder.verify(mockActorContext).executeOperationAsync(
1314                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1315
1316         inOrder.verify(mockActorContext).executeOperationAsync(
1317                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1318
1319         inOrder.verify(mockActorContext).executeOperationAsync(
1320                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1321
1322         inOrder.verify(mockActorContext).executeOperationAsync(
1323                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1324
1325         inOrder.verify(mockActorContext).executeOperationAsync(
1326                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1327
1328         inOrder.verify(mockActorContext).executeOperationAsync(
1329                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1330     }
1331
1332     @Test
1333     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1334
1335         SchemaContext schemaContext = SchemaContextHelper.full();
1336         Configuration configuration = mock(Configuration.class);
1337         doReturn(configuration).when(mockActorContext).getConfiguration();
1338         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1339         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1340
1341         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1342         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1343
1344         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1345         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1346
1347         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1348
1349         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
1350
1351         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1352
1353         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1354
1355         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1356                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1357
1358         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1359
1360         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1361
1362         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1363
1364         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1365
1366         for(NormalizedNode<?,?> node : collection){
1367             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1368         }
1369
1370         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1371                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1372
1373         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1374
1375         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1376                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1377
1378         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1379     }
1380
1381
1382     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1383         ActorSystem actorSystem = getSystem();
1384         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1385
1386         doReturn(getSystem().actorSelection(shardActorRef.path())).
1387                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1388
1389         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1390                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1391
1392         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1393
1394         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1395
1396         doReturn(actorSystem.actorSelection(txActorRef.path())).
1397                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1398
1399         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1400                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1401                         eqCreateTransaction(memberName, TransactionType.READ_ONLY));
1402
1403         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1404                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1405     }
1406 }