CDS: split TransactionType from TransactionProxy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.anyString;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Matchers.isA;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
45 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
46 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
47 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
48 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
49 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
50 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
51 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
53 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
54 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
55 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import scala.concurrent.Promise;
63
64 @SuppressWarnings("resource")
65 public class TransactionProxyTest extends AbstractTransactionProxyTest {
66
67     @SuppressWarnings("serial")
68     static class TestException extends RuntimeException {
69     }
70
71     static interface Invoker {
72         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
73     }
74
75     @Test
76     public void testRead() throws Exception {
77         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
78
79         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
80
81         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
82                 eq(actorSelection(actorRef)), eqSerializedReadData());
83
84         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
85                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
86
87         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
88
89         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
90
91         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
92                 eq(actorSelection(actorRef)), eqSerializedReadData());
93
94         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
95
96         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
97
98         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
99     }
100
101     @Test(expected = ReadFailedException.class)
102     public void testReadWithInvalidReplyMessageType() throws Exception {
103         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
104
105         doReturn(Futures.successful(new Object())).when(mockActorContext).
106                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
107
108         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
109
110         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
111     }
112
113     @Test(expected = TestException.class)
114     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
115         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
116
117         doReturn(Futures.failed(new TestException())).when(mockActorContext).
118                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
119
120         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
121
122         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
123     }
124
125     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
126             throws Throwable {
127         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
128
129         if (exToThrow instanceof PrimaryNotFoundException) {
130             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
131         } else {
132             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
133                     when(mockActorContext).findPrimaryShardAsync(anyString());
134         }
135
136         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
137                 any(ActorSelection.class), any());
138
139         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
140
141         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
142     }
143
144     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
145         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
146             @Override
147             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
148                 return proxy.read(TestModel.TEST_PATH);
149             }
150         });
151     }
152
153     @Test(expected = PrimaryNotFoundException.class)
154     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
155         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
156     }
157
158     @Test(expected = TimeoutException.class)
159     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
160         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
161                 new Exception("reason")));
162     }
163
164     @Test(expected = TestException.class)
165     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
166         testReadWithExceptionOnInitialCreateTransaction(new TestException());
167     }
168
169     @Test
170     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
171         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
172
173         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
174
175         expectBatchedModifications(actorRef, 1);
176
177         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
178                 eq(actorSelection(actorRef)), eqSerializedReadData());
179
180         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
181
182         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
183
184         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
185                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
186
187         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
188         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
189
190         InOrder inOrder = Mockito.inOrder(mockActorContext);
191         inOrder.verify(mockActorContext).executeOperationAsync(
192                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
193
194         inOrder.verify(mockActorContext).executeOperationAsync(
195                 eq(actorSelection(actorRef)), eqSerializedReadData());
196     }
197
198     @Test(expected=IllegalStateException.class)
199     public void testReadPreConditionCheck() {
200         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
201         transactionProxy.read(TestModel.TEST_PATH);
202     }
203
204     @Test(expected=IllegalArgumentException.class)
205     public void testInvalidCreateTransactionReply() throws Throwable {
206         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
207
208         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
209             actorSelection(actorRef.path().toString());
210
211         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
212             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
213
214         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
215             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
216
217         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
218
219         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
220     }
221
222     @Test
223     public void testExists() throws Exception {
224         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
225
226         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
227
228         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
229                 eq(actorSelection(actorRef)), eqSerializedDataExists());
230
231         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
232
233         assertEquals("Exists response", false, exists);
234
235         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
236                 eq(actorSelection(actorRef)), eqSerializedDataExists());
237
238         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
239
240         assertEquals("Exists response", true, exists);
241     }
242
243     @Test(expected = PrimaryNotFoundException.class)
244     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
245         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
246             @Override
247             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
248                 return proxy.exists(TestModel.TEST_PATH);
249             }
250         });
251     }
252
253     @Test(expected = ReadFailedException.class)
254     public void testExistsWithInvalidReplyMessageType() throws Exception {
255         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
256
257         doReturn(Futures.successful(new Object())).when(mockActorContext).
258                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
259
260         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
261                 READ_ONLY);
262
263         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
264     }
265
266     @Test(expected = TestException.class)
267     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
268         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
269
270         doReturn(Futures.failed(new TestException())).when(mockActorContext).
271                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
272
273         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
274
275         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
276     }
277
278     @Test
279     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
280         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
281
282         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
283
284         expectBatchedModifications(actorRef, 1);
285
286         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
287                 eq(actorSelection(actorRef)), eqSerializedDataExists());
288
289         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
290
291         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
292
293         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
294
295         assertEquals("Exists response", true, exists);
296
297         InOrder inOrder = Mockito.inOrder(mockActorContext);
298         inOrder.verify(mockActorContext).executeOperationAsync(
299                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
300
301         inOrder.verify(mockActorContext).executeOperationAsync(
302                 eq(actorSelection(actorRef)), eqSerializedDataExists());
303     }
304
305     @Test(expected=IllegalStateException.class)
306     public void testExistsPreConditionCheck() {
307         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
308         transactionProxy.exists(TestModel.TEST_PATH);
309     }
310
311     @Test
312     public void testWrite() throws Exception {
313         dataStoreContextBuilder.shardBatchedModificationCount(1);
314         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
315
316         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
317
318         expectBatchedModifications(actorRef, 1);
319
320         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
321
322         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
323
324         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
325     }
326
327     @Test
328     public void testWriteAfterAsyncRead() throws Throwable {
329         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
330
331         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
332         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
333                 eq(getSystem().actorSelection(actorRef.path())),
334                 eqCreateTransaction(memberName, READ_WRITE));
335
336         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
337                 eq(actorSelection(actorRef)), eqSerializedReadData());
338
339         expectBatchedModificationsReady(actorRef);
340
341         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
342
343         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
344
345         final CountDownLatch readComplete = new CountDownLatch(1);
346         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
347         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
348                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
349                     @Override
350                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
351                         try {
352                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
353                         } catch (Exception e) {
354                             caughtEx.set(e);
355                         } finally {
356                             readComplete.countDown();
357                         }
358                     }
359
360                     @Override
361                     public void onFailure(Throwable t) {
362                         caughtEx.set(t);
363                         readComplete.countDown();
364                     }
365                 });
366
367         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
368
369         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
370
371         if(caughtEx.get() != null) {
372             throw caughtEx.get();
373         }
374
375         // This sends the batched modification.
376         transactionProxy.ready();
377
378         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
379     }
380
381     @Test(expected=IllegalStateException.class)
382     public void testWritePreConditionCheck() {
383         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
384         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
385     }
386
387     @Test(expected=IllegalStateException.class)
388     public void testWriteAfterReadyPreConditionCheck() {
389         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
390
391         transactionProxy.ready();
392
393         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
394     }
395
396     @Test
397     public void testMerge() throws Exception {
398         dataStoreContextBuilder.shardBatchedModificationCount(1);
399         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
400
401         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
402
403         expectBatchedModifications(actorRef, 1);
404
405         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
406
407         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
408
409         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
410     }
411
412     @Test
413     public void testDelete() throws Exception {
414         dataStoreContextBuilder.shardBatchedModificationCount(1);
415         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
416
417         expectBatchedModifications(actorRef, 1);
418
419         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
420
421         transactionProxy.delete(TestModel.TEST_PATH);
422
423         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
424     }
425
426     @Test
427     public void testReadWrite() throws Exception {
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
429
430         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
431
432         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
433                 eq(actorSelection(actorRef)), eqSerializedReadData());
434
435         expectBatchedModifications(actorRef, 1);
436
437         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
438
439         transactionProxy.read(TestModel.TEST_PATH);
440
441         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
442
443         transactionProxy.read(TestModel.TEST_PATH);
444
445         transactionProxy.read(TestModel.TEST_PATH);
446
447         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
448         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
449
450         verifyBatchedModifications(batchedModifications.get(0), false,
451                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
452     }
453
454     @Test
455     public void testReadyWithReadWrite() throws Exception {
456         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
457
458         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
459
460         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
461                 eq(actorSelection(actorRef)), eqSerializedReadData());
462
463         expectBatchedModificationsReady(actorRef, true);
464
465         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
466
467         transactionProxy.read(TestModel.TEST_PATH);
468
469         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
470
471         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
472
473         assertTrue(ready instanceof SingleCommitCohortProxy);
474
475         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
476
477         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
478         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
479
480         verifyBatchedModifications(batchedModifications.get(0), true, true,
481                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
482
483         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
484     }
485
486     @Test
487     public void testReadyWithNoModifications() throws Exception {
488         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
489
490         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
491                 eq(actorSelection(actorRef)), eqSerializedReadData());
492
493         expectBatchedModificationsReady(actorRef, true);
494
495         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
496
497         transactionProxy.read(TestModel.TEST_PATH);
498
499         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
500
501         assertTrue(ready instanceof SingleCommitCohortProxy);
502
503         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
504
505         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
506         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
507
508         verifyBatchedModifications(batchedModifications.get(0), true, true);
509     }
510
511     @Test
512     public void testReadyWithMultipleShardWrites() throws Exception {
513         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
514
515         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
516
517         expectBatchedModificationsReady(actorRef1);
518         expectBatchedModificationsReady(actorRef2);
519
520         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
521
522         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
523         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
524
525         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
526
527         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
528
529         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
530                 actorSelection(actorRef2));
531     }
532
533     @Test
534     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
535         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
536
537         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
538
539         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
540
541         expectBatchedModificationsReady(actorRef, true);
542
543         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
544
545         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
546
547         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
548
549         assertTrue(ready instanceof SingleCommitCohortProxy);
550
551         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
552
553         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
554         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
555
556         verifyBatchedModifications(batchedModifications.get(0), true, true,
557                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
558
559         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
560                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
561     }
562
563     @Test
564     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
565         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
566         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
567
568         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
569
570         expectBatchedModificationsReady(actorRef, true);
571
572         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
573
574         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
575
576         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
577
578         assertTrue(ready instanceof SingleCommitCohortProxy);
579
580         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
581
582         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
583         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
584
585         verifyBatchedModifications(batchedModifications.get(0), false,
586                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
587
588         verifyBatchedModifications(batchedModifications.get(1), true, true);
589
590         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
591                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
592     }
593
594     @Test
595     public void testReadyWithReplyFailure() throws Exception {
596         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
597
598         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
599
600         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
601
602         expectFailedBatchedModifications(actorRef);
603
604         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
605
606         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
607
608         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
609
610         assertTrue(ready instanceof SingleCommitCohortProxy);
611
612         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
613     }
614
615     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
616         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
617
618         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
619
620         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
621
622         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
623
624         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
625
626         transactionProxy.delete(TestModel.TEST_PATH);
627
628         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
629
630         assertTrue(ready instanceof SingleCommitCohortProxy);
631
632         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
633     }
634
635     @Test
636     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
637         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
638     }
639
640     @Test
641     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
642         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
643     }
644
645     @Test
646     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
647         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
648     }
649
650     @Test
651     public void testReadyWithInvalidReplyMessageType() throws Exception {
652         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
653         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
654
655         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
656
657         doReturn(Futures.successful(new Object())).when(mockActorContext).
658                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
659
660         expectBatchedModificationsReady(actorRef2);
661
662         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
663
664         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
665         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
666
667         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
668
669         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
670
671         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
672                 IllegalArgumentException.class);
673     }
674
675     @Test
676     public void testGetIdentifier() {
677         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
678         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
679                 TransactionType.READ_ONLY);
680
681         Object id = transactionProxy.getIdentifier();
682         assertNotNull("getIdentifier returned null", id);
683         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
684     }
685
686     @Test
687     public void testClose() throws Exception{
688         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
689
690         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
691                 eq(actorSelection(actorRef)), eqSerializedReadData());
692
693         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
694
695         transactionProxy.read(TestModel.TEST_PATH);
696
697         transactionProxy.close();
698
699         verify(mockActorContext).sendOperationAsync(
700                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
701     }
702
703
704     /**
705      * Method to test a local Tx actor. The Tx paths are matched to decide if the
706      * Tx actor is local or not. This is done by mocking the Tx actor path
707      * and the caller paths and ensuring that the paths have the remote-address format
708      *
709      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
710      * the paths returned for the actors for all the tests are not qualified remote paths.
711      * Hence are treated as non-local/remote actors. In short, all tests except
712      * few below run for remote actors
713      *
714      * @throws Exception
715      */
716     @Test
717     public void testLocalTxActorRead() throws Exception {
718         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
719         doReturn(true).when(mockActorContext).isPathLocal(anyString());
720
721         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
722
723         // negative test case with null as the reply
724         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
725             any(ActorSelection.class), eqReadData());
726
727         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
728             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
729
730         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
731
732         // test case with node as read data reply
733         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
734
735         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
736             any(ActorSelection.class), eqReadData());
737
738         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
739
740         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
741
742         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
743
744         // test for local data exists
745         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
746             any(ActorSelection.class), eqDataExists());
747
748         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
749
750         assertEquals("Exists response", true, exists);
751     }
752
753     @Test
754     public void testLocalTxActorReady() throws Exception {
755         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
756         doReturn(true).when(mockActorContext).isPathLocal(anyString());
757
758         expectBatchedModificationsReady(actorRef, true);
759
760         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
761
762         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
763         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
764
765         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
766
767         assertTrue(ready instanceof SingleCommitCohortProxy);
768
769         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
770     }
771
772     private static interface TransactionProxyOperation {
773         void run(TransactionProxy transactionProxy);
774     }
775
776     private void throttleOperation(TransactionProxyOperation operation) {
777         throttleOperation(operation, 1, true);
778     }
779
780     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
781         ActorSystem actorSystem = getSystem();
782         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
783
784         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
785
786         doReturn(actorSystem.actorSelection(shardActorRef.path())).
787                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
788
789         if(shardFound) {
790             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
791                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
792         } else {
793             doReturn(Futures.failed(new Exception("not found")))
794                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
795         }
796
797         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
798         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
799                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
800                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
801
802         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
803                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
804                         eqCreateTransaction(memberName, READ_WRITE));
805
806         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
807
808         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
809
810         long start = System.nanoTime();
811
812         operation.run(transactionProxy);
813
814         long end = System.nanoTime();
815
816         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
817         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
818                 expected, (end-start)), (end - start) > expected);
819
820     }
821
822     private void completeOperation(TransactionProxyOperation operation){
823         completeOperation(operation, true);
824     }
825
826     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
827         ActorSystem actorSystem = getSystem();
828         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
829
830         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
831
832         doReturn(actorSystem.actorSelection(shardActorRef.path())).
833                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
834
835         if(shardFound) {
836             doReturn(primaryShardInfoReply(actorSystem, shardActorRef)).
837                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
838         } else {
839             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
840                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
841         }
842
843         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
844         String actorPath = txActorRef.path().toString();
845         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
846                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
847                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
848
849         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
850
851         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
852                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
853                         eqCreateTransaction(memberName, READ_WRITE));
854
855         doReturn(true).when(mockActorContext).isPathLocal(anyString());
856
857         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
858
859         long start = System.nanoTime();
860
861         operation.run(transactionProxy);
862
863         long end = System.nanoTime();
864
865         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
866         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
867                 expected, (end-start)), (end - start) <= expected);
868     }
869
870     public void testWriteThrottling(boolean shardFound){
871
872         throttleOperation(new TransactionProxyOperation() {
873             @Override
874             public void run(TransactionProxy transactionProxy) {
875                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
876
877                 expectBatchedModifications(2);
878
879                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
880
881                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
882             }
883         }, 1, shardFound);
884     }
885
886     @Test
887     public void testWriteThrottlingWhenShardFound(){
888         dataStoreContextBuilder.shardBatchedModificationCount(1);
889         throttleOperation(new TransactionProxyOperation() {
890             @Override
891             public void run(TransactionProxy transactionProxy) {
892                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
893
894                 expectIncompleteBatchedModifications();
895
896                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
897
898                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
899             }
900         });
901     }
902
903     @Test
904     public void testWriteThrottlingWhenShardNotFound(){
905         // Confirm that there is no throttling when the Shard is not found
906         dataStoreContextBuilder.shardBatchedModificationCount(1);
907         completeOperation(new TransactionProxyOperation() {
908             @Override
909             public void run(TransactionProxy transactionProxy) {
910                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
911
912                 expectBatchedModifications(2);
913
914                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
915
916                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
917             }
918         }, false);
919
920     }
921
922
923     @Test
924     public void testWriteCompletion(){
925         dataStoreContextBuilder.shardBatchedModificationCount(1);
926         completeOperation(new TransactionProxyOperation() {
927             @Override
928             public void run(TransactionProxy transactionProxy) {
929                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
930
931                 expectBatchedModifications(2);
932
933                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
934
935                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
936             }
937         });
938     }
939
940     @Test
941     public void testMergeThrottlingWhenShardFound(){
942         dataStoreContextBuilder.shardBatchedModificationCount(1);
943         throttleOperation(new TransactionProxyOperation() {
944             @Override
945             public void run(TransactionProxy transactionProxy) {
946                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
947
948                 expectIncompleteBatchedModifications();
949
950                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
951
952                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
953             }
954         });
955     }
956
957     @Test
958     public void testMergeThrottlingWhenShardNotFound(){
959         dataStoreContextBuilder.shardBatchedModificationCount(1);
960         completeOperation(new TransactionProxyOperation() {
961             @Override
962             public void run(TransactionProxy transactionProxy) {
963                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
964
965                 expectBatchedModifications(2);
966
967                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
968
969                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
970             }
971         }, false);
972     }
973
974     @Test
975     public void testMergeCompletion(){
976         dataStoreContextBuilder.shardBatchedModificationCount(1);
977         completeOperation(new TransactionProxyOperation() {
978             @Override
979             public void run(TransactionProxy transactionProxy) {
980                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
981
982                 expectBatchedModifications(2);
983
984                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
985
986                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
987             }
988         });
989
990     }
991
992     @Test
993     public void testDeleteThrottlingWhenShardFound(){
994
995         throttleOperation(new TransactionProxyOperation() {
996             @Override
997             public void run(TransactionProxy transactionProxy) {
998                 expectIncompleteBatchedModifications();
999
1000                 transactionProxy.delete(TestModel.TEST_PATH);
1001
1002                 transactionProxy.delete(TestModel.TEST_PATH);
1003             }
1004         });
1005     }
1006
1007
1008     @Test
1009     public void testDeleteThrottlingWhenShardNotFound(){
1010
1011         completeOperation(new TransactionProxyOperation() {
1012             @Override
1013             public void run(TransactionProxy transactionProxy) {
1014                 expectBatchedModifications(2);
1015
1016                 transactionProxy.delete(TestModel.TEST_PATH);
1017
1018                 transactionProxy.delete(TestModel.TEST_PATH);
1019             }
1020         }, false);
1021     }
1022
1023     @Test
1024     public void testDeleteCompletion(){
1025         dataStoreContextBuilder.shardBatchedModificationCount(1);
1026         completeOperation(new TransactionProxyOperation() {
1027             @Override
1028             public void run(TransactionProxy transactionProxy) {
1029                 expectBatchedModifications(2);
1030
1031                 transactionProxy.delete(TestModel.TEST_PATH);
1032
1033                 transactionProxy.delete(TestModel.TEST_PATH);
1034             }
1035         });
1036
1037     }
1038
1039     @Test
1040     public void testReadThrottlingWhenShardFound(){
1041
1042         throttleOperation(new TransactionProxyOperation() {
1043             @Override
1044             public void run(TransactionProxy transactionProxy) {
1045                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1046                         any(ActorSelection.class), eqReadData());
1047
1048                 transactionProxy.read(TestModel.TEST_PATH);
1049
1050                 transactionProxy.read(TestModel.TEST_PATH);
1051             }
1052         });
1053     }
1054
1055     @Test
1056     public void testReadThrottlingWhenShardNotFound(){
1057
1058         completeOperation(new TransactionProxyOperation() {
1059             @Override
1060             public void run(TransactionProxy transactionProxy) {
1061                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1062                         any(ActorSelection.class), eqReadData());
1063
1064                 transactionProxy.read(TestModel.TEST_PATH);
1065
1066                 transactionProxy.read(TestModel.TEST_PATH);
1067             }
1068         }, false);
1069     }
1070
1071
1072     @Test
1073     public void testReadCompletion(){
1074         completeOperation(new TransactionProxyOperation() {
1075             @Override
1076             public void run(TransactionProxy transactionProxy) {
1077                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1078
1079                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1080                         any(ActorSelection.class), eqReadData());
1081
1082                 transactionProxy.read(TestModel.TEST_PATH);
1083
1084                 transactionProxy.read(TestModel.TEST_PATH);
1085             }
1086         });
1087
1088     }
1089
1090     @Test
1091     public void testExistsThrottlingWhenShardFound(){
1092
1093         throttleOperation(new TransactionProxyOperation() {
1094             @Override
1095             public void run(TransactionProxy transactionProxy) {
1096                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1097                         any(ActorSelection.class), eqDataExists());
1098
1099                 transactionProxy.exists(TestModel.TEST_PATH);
1100
1101                 transactionProxy.exists(TestModel.TEST_PATH);
1102             }
1103         });
1104     }
1105
1106     @Test
1107     public void testExistsThrottlingWhenShardNotFound(){
1108
1109         completeOperation(new TransactionProxyOperation() {
1110             @Override
1111             public void run(TransactionProxy transactionProxy) {
1112                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1113                         any(ActorSelection.class), eqDataExists());
1114
1115                 transactionProxy.exists(TestModel.TEST_PATH);
1116
1117                 transactionProxy.exists(TestModel.TEST_PATH);
1118             }
1119         }, false);
1120     }
1121
1122
1123     @Test
1124     public void testExistsCompletion(){
1125         completeOperation(new TransactionProxyOperation() {
1126             @Override
1127             public void run(TransactionProxy transactionProxy) {
1128                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1129                         any(ActorSelection.class), eqDataExists());
1130
1131                 transactionProxy.exists(TestModel.TEST_PATH);
1132
1133                 transactionProxy.exists(TestModel.TEST_PATH);
1134             }
1135         });
1136
1137     }
1138
1139     @Test
1140     public void testReadyThrottling(){
1141
1142         throttleOperation(new TransactionProxyOperation() {
1143             @Override
1144             public void run(TransactionProxy transactionProxy) {
1145                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1146
1147                 expectBatchedModifications(1);
1148
1149                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1150                         any(ActorSelection.class), any(ReadyTransaction.class));
1151
1152                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1153
1154                 transactionProxy.ready();
1155             }
1156         });
1157     }
1158
1159     @Test
1160     public void testReadyThrottlingWithTwoTransactionContexts(){
1161
1162         throttleOperation(new TransactionProxyOperation() {
1163             @Override
1164             public void run(TransactionProxy transactionProxy) {
1165                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1166                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1167
1168                 expectBatchedModifications(2);
1169
1170                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1171                         any(ActorSelection.class), any(ReadyTransaction.class));
1172
1173                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1174
1175                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1176
1177                 transactionProxy.ready();
1178             }
1179         }, 2, true);
1180     }
1181
1182     private void testModificationOperationBatching(TransactionType type) throws Exception {
1183         int shardBatchedModificationCount = 3;
1184         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1185
1186         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1187
1188         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1189
1190         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1191         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1192
1193         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1194         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1195
1196         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1197         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1198
1199         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1200         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1201
1202         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1203         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1204
1205         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1206         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1207
1208         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1209         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1210
1211         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1212
1213         transactionProxy.write(writePath1, writeNode1);
1214         transactionProxy.write(writePath2, writeNode2);
1215         transactionProxy.delete(deletePath1);
1216         transactionProxy.merge(mergePath1, mergeNode1);
1217         transactionProxy.merge(mergePath2, mergeNode2);
1218         transactionProxy.write(writePath3, writeNode3);
1219         transactionProxy.merge(mergePath3, mergeNode3);
1220         transactionProxy.delete(deletePath2);
1221
1222         // This sends the last batch.
1223         transactionProxy.ready();
1224
1225         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1226         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1227
1228         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1229                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1230
1231         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1232                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1233
1234         verifyBatchedModifications(batchedModifications.get(2), true, true,
1235                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1236
1237         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1238     }
1239
1240     @Test
1241     public void testReadWriteModificationOperationBatching() throws Throwable {
1242         testModificationOperationBatching(READ_WRITE);
1243     }
1244
1245     @Test
1246     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1247         testModificationOperationBatching(WRITE_ONLY);
1248     }
1249
1250     @Test
1251     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1252         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1253         testModificationOperationBatching(WRITE_ONLY);
1254     }
1255
1256     @Test
1257     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1258
1259         int shardBatchedModificationCount = 10;
1260         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1261
1262         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1263
1264         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1265
1266         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1267         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1268
1269         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1270         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1271
1272         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1273         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1274
1275         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1276         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1277
1278         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1279
1280         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1281                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1282
1283         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1284                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1285
1286         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1287                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1288
1289         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1290
1291         transactionProxy.write(writePath1, writeNode1);
1292         transactionProxy.write(writePath2, writeNode2);
1293
1294         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1295                 get(5, TimeUnit.SECONDS);
1296
1297         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1298         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1299
1300         transactionProxy.merge(mergePath1, mergeNode1);
1301         transactionProxy.merge(mergePath2, mergeNode2);
1302
1303         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1304
1305         transactionProxy.delete(deletePath);
1306
1307         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1308         assertEquals("Exists response", true, exists);
1309
1310         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1311         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1312
1313         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1314         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1315
1316         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1317                 new WriteModification(writePath2, writeNode2));
1318
1319         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1320                 new MergeModification(mergePath2, mergeNode2));
1321
1322         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1323
1324         InOrder inOrder = Mockito.inOrder(mockActorContext);
1325         inOrder.verify(mockActorContext).executeOperationAsync(
1326                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1327
1328         inOrder.verify(mockActorContext).executeOperationAsync(
1329                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1330
1331         inOrder.verify(mockActorContext).executeOperationAsync(
1332                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1333
1334         inOrder.verify(mockActorContext).executeOperationAsync(
1335                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1336
1337         inOrder.verify(mockActorContext).executeOperationAsync(
1338                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1339
1340         inOrder.verify(mockActorContext).executeOperationAsync(
1341                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1342     }
1343
1344     @Test
1345     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1346
1347         SchemaContext schemaContext = SchemaContextHelper.full();
1348         Configuration configuration = mock(Configuration.class);
1349         doReturn(configuration).when(mockActorContext).getConfiguration();
1350         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1351         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1352
1353         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1354         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1355
1356         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1357         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1358
1359         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1360
1361         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
1362
1363         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1364
1365         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
1366
1367         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1368                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1369
1370         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1371
1372         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1373
1374         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1375
1376         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1377
1378         for(NormalizedNode<?,?> node : collection){
1379             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1380         }
1381
1382         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1383                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1384
1385         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1386
1387         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1388                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1389
1390         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1391     }
1392
1393
1394     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1395         ActorSystem actorSystem = getSystem();
1396         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1397
1398         doReturn(getSystem().actorSelection(shardActorRef.path())).
1399                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1400
1401         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1402                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1403
1404         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1405
1406         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1407
1408         doReturn(actorSystem.actorSelection(txActorRef.path())).
1409                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1410
1411         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1412                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1413                         eqCreateTransaction(memberName, TransactionType.READ_ONLY));
1414
1415         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1416                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1417     }
1418 }