d37a9db394cc3eff2ca1e01216f6df062c873f32
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.anyString;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Matchers.isA;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
46 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
47 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
48 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
49 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
50 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
51 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
52 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
53 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
57 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
62 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import scala.concurrent.Promise;
65
66 @SuppressWarnings("resource")
67 public class TransactionProxyTest extends AbstractTransactionProxyTest {
68
69     @SuppressWarnings("serial")
70     static class TestException extends RuntimeException {
71     }
72
73     static interface Invoker {
74         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
75     }
76
77     @Test
78     public void testRead() throws Exception {
79         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
80
81         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
82
83         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
84                 eq(actorSelection(actorRef)), eqSerializedReadData());
85
86         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
87                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
88
89         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
90
91         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
92
93         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
94                 eq(actorSelection(actorRef)), eqSerializedReadData());
95
96         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
97
98         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
99
100         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
101     }
102
103     @Test(expected = ReadFailedException.class)
104     public void testReadWithInvalidReplyMessageType() throws Exception {
105         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
106
107         doReturn(Futures.successful(new Object())).when(mockActorContext).
108                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
109
110         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
111
112         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
113     }
114
115     @Test(expected = TestException.class)
116     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
117         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
118
119         doReturn(Futures.failed(new TestException())).when(mockActorContext).
120                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
121
122         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
123
124         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
125     }
126
127     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
128             throws Throwable {
129         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
130
131         if (exToThrow instanceof PrimaryNotFoundException) {
132             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
133         } else {
134             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
135                     when(mockActorContext).findPrimaryShardAsync(anyString());
136         }
137
138         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
139                 any(ActorSelection.class), any());
140
141         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
142
143         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
144     }
145
146     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
147         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
148             @Override
149             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
150                 return proxy.read(TestModel.TEST_PATH);
151             }
152         });
153     }
154
155     @Test(expected = PrimaryNotFoundException.class)
156     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
157         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
158     }
159
160     @Test(expected = TimeoutException.class)
161     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
162         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
163                 new Exception("reason")));
164     }
165
166     @Test(expected = TestException.class)
167     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
168         testReadWithExceptionOnInitialCreateTransaction(new TestException());
169     }
170
171     @Test
172     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
173         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
174
175         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
176
177         expectBatchedModifications(actorRef, 1);
178
179         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
180                 eq(actorSelection(actorRef)), eqSerializedReadData());
181
182         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
183
184         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
185
186         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
187                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
188
189         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
190         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
191
192         InOrder inOrder = Mockito.inOrder(mockActorContext);
193         inOrder.verify(mockActorContext).executeOperationAsync(
194                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
195
196         inOrder.verify(mockActorContext).executeOperationAsync(
197                 eq(actorSelection(actorRef)), eqSerializedReadData());
198     }
199
200     @Test(expected=IllegalStateException.class)
201     public void testReadPreConditionCheck() {
202         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
203         transactionProxy.read(TestModel.TEST_PATH);
204     }
205
206     @Test(expected=IllegalArgumentException.class)
207     public void testInvalidCreateTransactionReply() throws Throwable {
208         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
209
210         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
211             actorSelection(actorRef.path().toString());
212
213         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
214             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
215
216         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
217             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
218
219         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
220
221         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
222     }
223
224     @Test
225     public void testExists() throws Exception {
226         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
227
228         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
229
230         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
231                 eq(actorSelection(actorRef)), eqSerializedDataExists());
232
233         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
234
235         assertEquals("Exists response", false, exists);
236
237         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
238                 eq(actorSelection(actorRef)), eqSerializedDataExists());
239
240         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
241
242         assertEquals("Exists response", true, exists);
243     }
244
245     @Test(expected = PrimaryNotFoundException.class)
246     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
247         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
248             @Override
249             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
250                 return proxy.exists(TestModel.TEST_PATH);
251             }
252         });
253     }
254
255     @Test(expected = ReadFailedException.class)
256     public void testExistsWithInvalidReplyMessageType() throws Exception {
257         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
258
259         doReturn(Futures.successful(new Object())).when(mockActorContext).
260                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
261
262         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
263                 READ_ONLY);
264
265         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
266     }
267
268     @Test(expected = TestException.class)
269     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
270         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
271
272         doReturn(Futures.failed(new TestException())).when(mockActorContext).
273                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
274
275         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
276
277         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
278     }
279
280     @Test
281     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
282         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
283
284         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
285
286         expectBatchedModifications(actorRef, 1);
287
288         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
289                 eq(actorSelection(actorRef)), eqSerializedDataExists());
290
291         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
292
293         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
294
295         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
296
297         assertEquals("Exists response", true, exists);
298
299         InOrder inOrder = Mockito.inOrder(mockActorContext);
300         inOrder.verify(mockActorContext).executeOperationAsync(
301                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
302
303         inOrder.verify(mockActorContext).executeOperationAsync(
304                 eq(actorSelection(actorRef)), eqSerializedDataExists());
305     }
306
307     @Test(expected=IllegalStateException.class)
308     public void testExistsPreConditionCheck() {
309         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
310         transactionProxy.exists(TestModel.TEST_PATH);
311     }
312
313     @Test
314     public void testWrite() throws Exception {
315         dataStoreContextBuilder.shardBatchedModificationCount(1);
316         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
317
318         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
319
320         expectBatchedModifications(actorRef, 1);
321
322         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
323
324         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
325
326         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
327     }
328
329     @Test
330     public void testWriteAfterAsyncRead() throws Throwable {
331         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
332
333         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
334         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
335                 eq(getSystem().actorSelection(actorRef.path())),
336                 eqCreateTransaction(memberName, READ_WRITE));
337
338         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
339                 eq(actorSelection(actorRef)), eqSerializedReadData());
340
341         expectBatchedModificationsReady(actorRef);
342
343         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
344
345         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
346
347         final CountDownLatch readComplete = new CountDownLatch(1);
348         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
349         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
350                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
351                     @Override
352                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
353                         try {
354                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
355                         } catch (Exception e) {
356                             caughtEx.set(e);
357                         } finally {
358                             readComplete.countDown();
359                         }
360                     }
361
362                     @Override
363                     public void onFailure(Throwable t) {
364                         caughtEx.set(t);
365                         readComplete.countDown();
366                     }
367                 });
368
369         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
370
371         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
372
373         if(caughtEx.get() != null) {
374             throw caughtEx.get();
375         }
376
377         // This sends the batched modification.
378         transactionProxy.ready();
379
380         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
381     }
382
383     @Test(expected=IllegalStateException.class)
384     public void testWritePreConditionCheck() {
385         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
386         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
387     }
388
389     @Test(expected=IllegalStateException.class)
390     public void testWriteAfterReadyPreConditionCheck() {
391         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
392
393         transactionProxy.ready();
394
395         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
396     }
397
398     @Test
399     public void testMerge() throws Exception {
400         dataStoreContextBuilder.shardBatchedModificationCount(1);
401         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
402
403         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
404
405         expectBatchedModifications(actorRef, 1);
406
407         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
408
409         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
410
411         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
412     }
413
414     @Test
415     public void testDelete() throws Exception {
416         dataStoreContextBuilder.shardBatchedModificationCount(1);
417         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
418
419         expectBatchedModifications(actorRef, 1);
420
421         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
422
423         transactionProxy.delete(TestModel.TEST_PATH);
424
425         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
426     }
427
428     @Test
429     public void testReadWrite() throws Exception {
430         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
431
432         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
433
434         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
435                 eq(actorSelection(actorRef)), eqSerializedReadData());
436
437         expectBatchedModifications(actorRef, 1);
438
439         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
440
441         transactionProxy.read(TestModel.TEST_PATH);
442
443         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
444
445         transactionProxy.read(TestModel.TEST_PATH);
446
447         transactionProxy.read(TestModel.TEST_PATH);
448
449         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
450         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
451
452         verifyBatchedModifications(batchedModifications.get(0), false,
453                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
454     }
455
456     @Test
457     public void testReadyWithReadWrite() throws Exception {
458         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
459
460         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
461
462         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
463                 eq(actorSelection(actorRef)), eqSerializedReadData());
464
465         expectBatchedModificationsReady(actorRef, true);
466
467         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
468
469         transactionProxy.read(TestModel.TEST_PATH);
470
471         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
472
473         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
474
475         assertTrue(ready instanceof SingleCommitCohortProxy);
476
477         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
478
479         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
480         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
481
482         verifyBatchedModifications(batchedModifications.get(0), true, true,
483                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
484
485         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
486     }
487
488     @Test
489     public void testReadyWithNoModifications() throws Exception {
490         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
491
492         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
493                 eq(actorSelection(actorRef)), eqSerializedReadData());
494
495         expectBatchedModificationsReady(actorRef, true);
496
497         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
498
499         transactionProxy.read(TestModel.TEST_PATH);
500
501         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
502
503         assertTrue(ready instanceof SingleCommitCohortProxy);
504
505         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
506
507         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
508         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
509
510         verifyBatchedModifications(batchedModifications.get(0), true, true);
511     }
512
513     @Test
514     public void testReadyWithMultipleShardWrites() throws Exception {
515         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
516
517         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
518
519         expectBatchedModificationsReady(actorRef1);
520         expectBatchedModificationsReady(actorRef2);
521
522         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
523
524         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
525         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
526
527         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
528
529         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
530
531         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
532                 actorSelection(actorRef2));
533     }
534
535     @Test
536     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
537         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
538
539         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
540
541         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
542
543         expectBatchedModificationsReady(actorRef, true);
544
545         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
546
547         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
548
549         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
550
551         assertTrue(ready instanceof SingleCommitCohortProxy);
552
553         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
554
555         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
556         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
557
558         verifyBatchedModifications(batchedModifications.get(0), true, true,
559                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
560
561         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
562                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
563     }
564
565     @Test
566     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
567         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
568         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
569
570         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
571
572         expectBatchedModificationsReady(actorRef, true);
573
574         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
575
576         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
577
578         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
579
580         assertTrue(ready instanceof SingleCommitCohortProxy);
581
582         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
583
584         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
585         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
586
587         verifyBatchedModifications(batchedModifications.get(0), false,
588                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
589
590         verifyBatchedModifications(batchedModifications.get(1), true, true);
591
592         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
593                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
594     }
595
596     @Test
597     public void testReadyWithReplyFailure() throws Exception {
598         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
599
600         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
601
602         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
603
604         expectFailedBatchedModifications(actorRef);
605
606         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
607
608         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
609
610         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
611
612         assertTrue(ready instanceof SingleCommitCohortProxy);
613
614         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
615     }
616
617     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
618         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
619
620         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
621
622         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
623
624         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
625
626         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
627
628         transactionProxy.delete(TestModel.TEST_PATH);
629
630         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
631
632         assertTrue(ready instanceof SingleCommitCohortProxy);
633
634         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
635     }
636
637     @Test
638     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
639         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
640     }
641
642     @Test
643     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
644         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
645     }
646
647     @Test
648     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
649         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
650     }
651
652     @Test
653     public void testReadyWithInvalidReplyMessageType() throws Exception {
654         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
655         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
656
657         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
658
659         doReturn(Futures.successful(new Object())).when(mockActorContext).
660                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
661
662         expectBatchedModificationsReady(actorRef2);
663
664         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
665
666         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
667         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
668
669         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
670
671         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
672
673         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
674                 IllegalArgumentException.class);
675     }
676
677     @Test
678     public void testGetIdentifier() {
679         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
680         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
681                 TransactionType.READ_ONLY);
682
683         Object id = transactionProxy.getIdentifier();
684         assertNotNull("getIdentifier returned null", id);
685         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
686     }
687
688     @Test
689     public void testClose() throws Exception{
690         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
691
692         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
693                 eq(actorSelection(actorRef)), eqSerializedReadData());
694
695         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
696
697         transactionProxy.read(TestModel.TEST_PATH);
698
699         transactionProxy.close();
700
701         verify(mockActorContext).sendOperationAsync(
702                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
703     }
704
705
706     /**
707      * Method to test a local Tx actor. The Tx paths are matched to decide if the
708      * Tx actor is local or not. This is done by mocking the Tx actor path
709      * and the caller paths and ensuring that the paths have the remote-address format
710      *
711      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
712      * the paths returned for the actors for all the tests are not qualified remote paths.
713      * Hence are treated as non-local/remote actors. In short, all tests except
714      * few below run for remote actors
715      *
716      * @throws Exception
717      */
718     @Test
719     public void testLocalTxActorRead() throws Exception {
720         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
721         doReturn(true).when(mockActorContext).isPathLocal(anyString());
722
723         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
724
725         // negative test case with null as the reply
726         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
727             any(ActorSelection.class), eqReadData());
728
729         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
730             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
731
732         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
733
734         // test case with node as read data reply
735         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
736
737         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
738             any(ActorSelection.class), eqReadData());
739
740         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
741
742         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
743
744         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
745
746         // test for local data exists
747         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
748             any(ActorSelection.class), eqDataExists());
749
750         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
751
752         assertEquals("Exists response", true, exists);
753     }
754
755     @Test
756     public void testLocalTxActorReady() throws Exception {
757         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
758         doReturn(true).when(mockActorContext).isPathLocal(anyString());
759
760         expectBatchedModificationsReady(actorRef, true);
761
762         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
763
764         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
765         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
766
767         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
768
769         assertTrue(ready instanceof SingleCommitCohortProxy);
770
771         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
772     }
773
774     private static interface TransactionProxyOperation {
775         void run(TransactionProxy transactionProxy);
776     }
777
778     private void throttleOperation(TransactionProxyOperation operation) {
779         throttleOperation(operation, 1, true);
780     }
781
782     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
783         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
784     }
785
786     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
787         ActorSystem actorSystem = getSystem();
788         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
789
790         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
791
792         doReturn(actorSystem.actorSelection(shardActorRef.path())).
793                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
794
795         if(shardFound) {
796             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
797                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
798         } else {
799             doReturn(Futures.failed(new Exception("not found")))
800                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
801         }
802
803         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
804         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
805                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
806                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
807
808         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
809                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
810                         eqCreateTransaction(memberName, READ_WRITE));
811
812         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
813
814         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
815
816         long start = System.nanoTime();
817
818         operation.run(transactionProxy);
819
820         long end = System.nanoTime();
821
822         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
823         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
824                 expected, (end-start)), (end - start) > expected);
825
826     }
827
828     private void completeOperation(TransactionProxyOperation operation){
829         completeOperation(operation, true);
830     }
831
832     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
833         ActorSystem actorSystem = getSystem();
834         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
835
836         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
837
838         doReturn(actorSystem.actorSelection(shardActorRef.path())).
839                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
840
841         if(shardFound) {
842             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
843                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
844         } else {
845             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
846                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
847         }
848
849         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
850         String actorPath = txActorRef.path().toString();
851         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
852                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
853                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
854
855         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
856
857         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
858                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
859                         eqCreateTransaction(memberName, READ_WRITE));
860
861         doReturn(true).when(mockActorContext).isPathLocal(anyString());
862
863         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
864
865         long start = System.nanoTime();
866
867         operation.run(transactionProxy);
868
869         long end = System.nanoTime();
870
871         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
872         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
873                 expected, (end-start)), (end - start) <= expected);
874     }
875
876     @Test
877     public void testWriteThrottlingWhenShardFound(){
878         dataStoreContextBuilder.shardBatchedModificationCount(1);
879         throttleOperation(new TransactionProxyOperation() {
880             @Override
881             public void run(TransactionProxy transactionProxy) {
882                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
883
884                 expectIncompleteBatchedModifications();
885
886                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
887
888                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
889             }
890         });
891     }
892
893     @Test
894     public void testWriteThrottlingWhenShardNotFound(){
895         // Confirm that there is no throttling when the Shard is not found
896         dataStoreContextBuilder.shardBatchedModificationCount(1);
897         completeOperation(new TransactionProxyOperation() {
898             @Override
899             public void run(TransactionProxy transactionProxy) {
900                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
901
902                 expectBatchedModifications(2);
903
904                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
905
906                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
907             }
908         }, false);
909
910     }
911
912
913     @Test
914     public void testWriteCompletion(){
915         dataStoreContextBuilder.shardBatchedModificationCount(1);
916         completeOperation(new TransactionProxyOperation() {
917             @Override
918             public void run(TransactionProxy transactionProxy) {
919                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
920
921                 expectBatchedModifications(2);
922
923                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
924
925                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
926             }
927         });
928     }
929
930     @Test
931     public void testMergeThrottlingWhenShardFound(){
932         dataStoreContextBuilder.shardBatchedModificationCount(1);
933         throttleOperation(new TransactionProxyOperation() {
934             @Override
935             public void run(TransactionProxy transactionProxy) {
936                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
937
938                 expectIncompleteBatchedModifications();
939
940                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
941
942                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
943             }
944         });
945     }
946
947     @Test
948     public void testMergeThrottlingWhenShardNotFound(){
949         dataStoreContextBuilder.shardBatchedModificationCount(1);
950         completeOperation(new TransactionProxyOperation() {
951             @Override
952             public void run(TransactionProxy transactionProxy) {
953                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
954
955                 expectBatchedModifications(2);
956
957                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
958
959                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
960             }
961         }, false);
962     }
963
964     @Test
965     public void testMergeCompletion(){
966         dataStoreContextBuilder.shardBatchedModificationCount(1);
967         completeOperation(new TransactionProxyOperation() {
968             @Override
969             public void run(TransactionProxy transactionProxy) {
970                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
971
972                 expectBatchedModifications(2);
973
974                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
975
976                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
977             }
978         });
979
980     }
981
982     @Test
983     public void testDeleteThrottlingWhenShardFound(){
984
985         throttleOperation(new TransactionProxyOperation() {
986             @Override
987             public void run(TransactionProxy transactionProxy) {
988                 expectIncompleteBatchedModifications();
989
990                 transactionProxy.delete(TestModel.TEST_PATH);
991
992                 transactionProxy.delete(TestModel.TEST_PATH);
993             }
994         });
995     }
996
997
998     @Test
999     public void testDeleteThrottlingWhenShardNotFound(){
1000
1001         completeOperation(new TransactionProxyOperation() {
1002             @Override
1003             public void run(TransactionProxy transactionProxy) {
1004                 expectBatchedModifications(2);
1005
1006                 transactionProxy.delete(TestModel.TEST_PATH);
1007
1008                 transactionProxy.delete(TestModel.TEST_PATH);
1009             }
1010         }, false);
1011     }
1012
1013     @Test
1014     public void testDeleteCompletion(){
1015         dataStoreContextBuilder.shardBatchedModificationCount(1);
1016         completeOperation(new TransactionProxyOperation() {
1017             @Override
1018             public void run(TransactionProxy transactionProxy) {
1019                 expectBatchedModifications(2);
1020
1021                 transactionProxy.delete(TestModel.TEST_PATH);
1022
1023                 transactionProxy.delete(TestModel.TEST_PATH);
1024             }
1025         });
1026
1027     }
1028
1029     @Test
1030     public void testReadThrottlingWhenShardFound(){
1031
1032         throttleOperation(new TransactionProxyOperation() {
1033             @Override
1034             public void run(TransactionProxy transactionProxy) {
1035                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1036                         any(ActorSelection.class), eqReadData());
1037
1038                 transactionProxy.read(TestModel.TEST_PATH);
1039
1040                 transactionProxy.read(TestModel.TEST_PATH);
1041             }
1042         });
1043     }
1044
1045     @Test
1046     public void testReadThrottlingWhenShardNotFound(){
1047
1048         completeOperation(new TransactionProxyOperation() {
1049             @Override
1050             public void run(TransactionProxy transactionProxy) {
1051                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1052                         any(ActorSelection.class), eqReadData());
1053
1054                 transactionProxy.read(TestModel.TEST_PATH);
1055
1056                 transactionProxy.read(TestModel.TEST_PATH);
1057             }
1058         }, false);
1059     }
1060
1061
1062     @Test
1063     public void testReadCompletion(){
1064         completeOperation(new TransactionProxyOperation() {
1065             @Override
1066             public void run(TransactionProxy transactionProxy) {
1067                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1068
1069                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1070                         any(ActorSelection.class), eqReadData());
1071
1072                 transactionProxy.read(TestModel.TEST_PATH);
1073
1074                 transactionProxy.read(TestModel.TEST_PATH);
1075             }
1076         });
1077
1078     }
1079
1080     @Test
1081     public void testExistsThrottlingWhenShardFound(){
1082
1083         throttleOperation(new TransactionProxyOperation() {
1084             @Override
1085             public void run(TransactionProxy transactionProxy) {
1086                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1087                         any(ActorSelection.class), eqDataExists());
1088
1089                 transactionProxy.exists(TestModel.TEST_PATH);
1090
1091                 transactionProxy.exists(TestModel.TEST_PATH);
1092             }
1093         });
1094     }
1095
1096     @Test
1097     public void testExistsThrottlingWhenShardNotFound(){
1098
1099         completeOperation(new TransactionProxyOperation() {
1100             @Override
1101             public void run(TransactionProxy transactionProxy) {
1102                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1103                         any(ActorSelection.class), eqDataExists());
1104
1105                 transactionProxy.exists(TestModel.TEST_PATH);
1106
1107                 transactionProxy.exists(TestModel.TEST_PATH);
1108             }
1109         }, false);
1110     }
1111
1112
1113     @Test
1114     public void testExistsCompletion(){
1115         completeOperation(new TransactionProxyOperation() {
1116             @Override
1117             public void run(TransactionProxy transactionProxy) {
1118                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1119                         any(ActorSelection.class), eqDataExists());
1120
1121                 transactionProxy.exists(TestModel.TEST_PATH);
1122
1123                 transactionProxy.exists(TestModel.TEST_PATH);
1124             }
1125         });
1126
1127     }
1128
1129     @Test
1130     public void testReadyThrottling(){
1131
1132         throttleOperation(new TransactionProxyOperation() {
1133             @Override
1134             public void run(TransactionProxy transactionProxy) {
1135                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1136
1137                 expectBatchedModifications(1);
1138
1139                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1140                         any(ActorSelection.class), any(ReadyTransaction.class));
1141
1142                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1143
1144                 transactionProxy.ready();
1145             }
1146         });
1147     }
1148
1149     @Test
1150     public void testReadyThrottlingWithTwoTransactionContexts(){
1151
1152         throttleOperation(new TransactionProxyOperation() {
1153             @Override
1154             public void run(TransactionProxy transactionProxy) {
1155                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1156                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1157
1158                 expectBatchedModifications(2);
1159
1160                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1161                         any(ActorSelection.class), any(ReadyTransaction.class));
1162
1163                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1164
1165                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1166
1167                 transactionProxy.ready();
1168             }
1169         }, 2, true);
1170     }
1171
1172     private void testModificationOperationBatching(TransactionType type) throws Exception {
1173         int shardBatchedModificationCount = 3;
1174         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1175
1176         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1177
1178         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1179
1180         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1181         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1182
1183         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1184         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1185
1186         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1187         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1188
1189         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1190         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1191
1192         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1193         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1194
1195         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1196         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1197
1198         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1199         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1200
1201         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1202
1203         transactionProxy.write(writePath1, writeNode1);
1204         transactionProxy.write(writePath2, writeNode2);
1205         transactionProxy.delete(deletePath1);
1206         transactionProxy.merge(mergePath1, mergeNode1);
1207         transactionProxy.merge(mergePath2, mergeNode2);
1208         transactionProxy.write(writePath3, writeNode3);
1209         transactionProxy.merge(mergePath3, mergeNode3);
1210         transactionProxy.delete(deletePath2);
1211
1212         // This sends the last batch.
1213         transactionProxy.ready();
1214
1215         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1216         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1217
1218         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1219                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1220
1221         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1222                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1223
1224         verifyBatchedModifications(batchedModifications.get(2), true, true,
1225                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1226
1227         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1228     }
1229
1230     @Test
1231     public void testReadWriteModificationOperationBatching() throws Throwable {
1232         testModificationOperationBatching(READ_WRITE);
1233     }
1234
1235     @Test
1236     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1237         testModificationOperationBatching(WRITE_ONLY);
1238     }
1239
1240     @Test
1241     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1242         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1243         testModificationOperationBatching(WRITE_ONLY);
1244     }
1245
1246     @Test
1247     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1248
1249         int shardBatchedModificationCount = 10;
1250         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1251
1252         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1253
1254         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1255
1256         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1257         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1258
1259         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1260         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1261
1262         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1263         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1264
1265         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1266         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1267
1268         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1269
1270         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1271                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1272
1273         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1274                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1275
1276         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1277                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1278
1279         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1280
1281         transactionProxy.write(writePath1, writeNode1);
1282         transactionProxy.write(writePath2, writeNode2);
1283
1284         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1285                 get(5, TimeUnit.SECONDS);
1286
1287         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1288         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1289
1290         transactionProxy.merge(mergePath1, mergeNode1);
1291         transactionProxy.merge(mergePath2, mergeNode2);
1292
1293         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1294
1295         transactionProxy.delete(deletePath);
1296
1297         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1298         assertEquals("Exists response", true, exists);
1299
1300         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1301         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1302
1303         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1304         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1305
1306         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1307                 new WriteModification(writePath2, writeNode2));
1308
1309         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1310                 new MergeModification(mergePath2, mergeNode2));
1311
1312         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1313
1314         InOrder inOrder = Mockito.inOrder(mockActorContext);
1315         inOrder.verify(mockActorContext).executeOperationAsync(
1316                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1317
1318         inOrder.verify(mockActorContext).executeOperationAsync(
1319                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1320
1321         inOrder.verify(mockActorContext).executeOperationAsync(
1322                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1323
1324         inOrder.verify(mockActorContext).executeOperationAsync(
1325                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1326
1327         inOrder.verify(mockActorContext).executeOperationAsync(
1328                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1329
1330         inOrder.verify(mockActorContext).executeOperationAsync(
1331                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1332     }
1333
1334     @Test
1335     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1336
1337         SchemaContext schemaContext = SchemaContextHelper.full();
1338         Configuration configuration = mock(Configuration.class);
1339         doReturn(configuration).when(mockActorContext).getConfiguration();
1340         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1341         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1342
1343         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1344         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1345
1346         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1347         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1348
1349         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1350
1351         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
1352
1353         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1354
1355         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
1356
1357         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1358                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1359
1360         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1361
1362         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1363
1364         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1365
1366         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1367
1368         for(NormalizedNode<?,?> node : collection){
1369             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1370         }
1371
1372         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1373                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1374
1375         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1376
1377         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1378                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1379
1380         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1381     }
1382
1383
1384     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1385         ActorSystem actorSystem = getSystem();
1386         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1387
1388         doReturn(getSystem().actorSelection(shardActorRef.path())).
1389                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1390
1391         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1392                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1393
1394         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1395
1396         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1397
1398         doReturn(actorSystem.actorSelection(txActorRef.path())).
1399                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1400
1401         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1402                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1403                         eqCreateTransaction(memberName, TransactionType.READ_ONLY));
1404
1405         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1406                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1407     }
1408 }