cc9692bfd91b72f3245f2bb6bd160f12551720b5
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.anyString;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Matchers.isA;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
43 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
45 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
46 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
47 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
48 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
49 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
50 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
51 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
53 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
54 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
55 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import scala.concurrent.Promise;
63
64 @SuppressWarnings("resource")
65 public class TransactionProxyTest extends AbstractTransactionProxyTest {
66
67     @SuppressWarnings("serial")
68     static class TestException extends RuntimeException {
69     }
70
71     static interface Invoker {
72         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
73     }
74
75     @Test
76     public void testRead() throws Exception {
77         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
78
79         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
80
81         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
82                 eq(actorSelection(actorRef)), eqSerializedReadData());
83
84         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
85                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
86
87         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
88
89         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
90
91         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
92                 eq(actorSelection(actorRef)), eqSerializedReadData());
93
94         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
95
96         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
97
98         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
99     }
100
101     @Test(expected = ReadFailedException.class)
102     public void testReadWithInvalidReplyMessageType() throws Exception {
103         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
104
105         doReturn(Futures.successful(new Object())).when(mockActorContext).
106                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
107
108         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
109
110         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
111     }
112
113     @Test(expected = TestException.class)
114     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
115         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
116
117         doReturn(Futures.failed(new TestException())).when(mockActorContext).
118                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
119
120         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
121
122         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
123     }
124
125     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
126             throws Throwable {
127         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
128
129         if (exToThrow instanceof PrimaryNotFoundException) {
130             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
131         } else {
132             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
133                     when(mockActorContext).findPrimaryShardAsync(anyString());
134         }
135
136         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
137                 any(ActorSelection.class), any());
138
139         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
140
141         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
142     }
143
144     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
145         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
146             @Override
147             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
148                 return proxy.read(TestModel.TEST_PATH);
149             }
150         });
151     }
152
153     @Test(expected = PrimaryNotFoundException.class)
154     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
155         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
156     }
157
158     @Test(expected = TimeoutException.class)
159     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
160         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
161                 new Exception("reason")));
162     }
163
164     @Test(expected = TestException.class)
165     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
166         testReadWithExceptionOnInitialCreateTransaction(new TestException());
167     }
168
169     @Test
170     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
171         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
172
173         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
174
175         expectBatchedModifications(actorRef, 1);
176
177         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
178                 eq(actorSelection(actorRef)), eqSerializedReadData());
179
180         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
181
182         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
183
184         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
185                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
186
187         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
188         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
189
190         InOrder inOrder = Mockito.inOrder(mockActorContext);
191         inOrder.verify(mockActorContext).executeOperationAsync(
192                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
193
194         inOrder.verify(mockActorContext).executeOperationAsync(
195                 eq(actorSelection(actorRef)), eqSerializedReadData());
196     }
197
198     @Test(expected=IllegalStateException.class)
199     public void testReadPreConditionCheck() {
200         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
201         transactionProxy.read(TestModel.TEST_PATH);
202     }
203
204     @Test(expected=IllegalArgumentException.class)
205     public void testInvalidCreateTransactionReply() throws Throwable {
206         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
207
208         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
209             actorSelection(actorRef.path().toString());
210
211         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
212             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
213
214         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
215             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
216
217         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
218
219         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
220     }
221
222     @Test
223     public void testExists() throws Exception {
224         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
225
226         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
227
228         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
229                 eq(actorSelection(actorRef)), eqSerializedDataExists());
230
231         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
232
233         assertEquals("Exists response", false, exists);
234
235         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
236                 eq(actorSelection(actorRef)), eqSerializedDataExists());
237
238         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
239
240         assertEquals("Exists response", true, exists);
241     }
242
243     @Test(expected = PrimaryNotFoundException.class)
244     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
245         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
246             @Override
247             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
248                 return proxy.exists(TestModel.TEST_PATH);
249             }
250         });
251     }
252
253     @Test(expected = ReadFailedException.class)
254     public void testExistsWithInvalidReplyMessageType() throws Exception {
255         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
256
257         doReturn(Futures.successful(new Object())).when(mockActorContext).
258                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
259
260         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
261                 READ_ONLY);
262
263         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
264     }
265
266     @Test(expected = TestException.class)
267     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
268         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
269
270         doReturn(Futures.failed(new TestException())).when(mockActorContext).
271                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
272
273         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
274
275         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
276     }
277
278     @Test
279     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
280         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
281
282         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
283
284         expectBatchedModifications(actorRef, 1);
285
286         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
287                 eq(actorSelection(actorRef)), eqSerializedDataExists());
288
289         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
290
291         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
292
293         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
294
295         assertEquals("Exists response", true, exists);
296
297         InOrder inOrder = Mockito.inOrder(mockActorContext);
298         inOrder.verify(mockActorContext).executeOperationAsync(
299                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
300
301         inOrder.verify(mockActorContext).executeOperationAsync(
302                 eq(actorSelection(actorRef)), eqSerializedDataExists());
303     }
304
305     @Test(expected=IllegalStateException.class)
306     public void testExistsPreConditionCheck() {
307         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
308         transactionProxy.exists(TestModel.TEST_PATH);
309     }
310
311     @Test
312     public void testWrite() throws Exception {
313         dataStoreContextBuilder.shardBatchedModificationCount(1);
314         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
315
316         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
317
318         expectBatchedModifications(actorRef, 1);
319
320         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
321
322         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
323
324         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
325     }
326
327     @Test
328     public void testWriteAfterAsyncRead() throws Throwable {
329         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
330
331         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
332         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
333                 eq(getSystem().actorSelection(actorRef.path())),
334                 eqCreateTransaction(memberName, READ_WRITE));
335
336         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
337                 eq(actorSelection(actorRef)), eqSerializedReadData());
338
339         expectBatchedModificationsReady(actorRef);
340
341         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
342
343         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
344
345         final CountDownLatch readComplete = new CountDownLatch(1);
346         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
347         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
348                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
349                     @Override
350                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
351                         try {
352                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
353                         } catch (Exception e) {
354                             caughtEx.set(e);
355                         } finally {
356                             readComplete.countDown();
357                         }
358                     }
359
360                     @Override
361                     public void onFailure(Throwable t) {
362                         caughtEx.set(t);
363                         readComplete.countDown();
364                     }
365                 });
366
367         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
368
369         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
370
371         if(caughtEx.get() != null) {
372             throw caughtEx.get();
373         }
374
375         // This sends the batched modification.
376         transactionProxy.ready();
377
378         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
379     }
380
381     @Test(expected=IllegalStateException.class)
382     public void testWritePreConditionCheck() {
383         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
384         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
385     }
386
387     @Test(expected=IllegalStateException.class)
388     public void testWriteAfterReadyPreConditionCheck() {
389         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
390
391         transactionProxy.ready();
392
393         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
394     }
395
396     @Test
397     public void testMerge() throws Exception {
398         dataStoreContextBuilder.shardBatchedModificationCount(1);
399         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
400
401         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
402
403         expectBatchedModifications(actorRef, 1);
404
405         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
406
407         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
408
409         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
410     }
411
412     @Test
413     public void testDelete() throws Exception {
414         dataStoreContextBuilder.shardBatchedModificationCount(1);
415         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
416
417         expectBatchedModifications(actorRef, 1);
418
419         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
420
421         transactionProxy.delete(TestModel.TEST_PATH);
422
423         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
424     }
425
426     @Test
427     public void testReadWrite() throws Exception {
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
429
430         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
431
432         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
433                 eq(actorSelection(actorRef)), eqSerializedReadData());
434
435         expectBatchedModifications(actorRef, 1);
436
437         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
438
439         transactionProxy.read(TestModel.TEST_PATH);
440
441         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
442
443         transactionProxy.read(TestModel.TEST_PATH);
444
445         transactionProxy.read(TestModel.TEST_PATH);
446
447         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
448         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
449
450         verifyBatchedModifications(batchedModifications.get(0), false,
451                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
452     }
453
454     @Test
455     public void testReadyWithReadWrite() throws Exception {
456         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
457
458         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
459
460         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
461                 eq(actorSelection(actorRef)), eqSerializedReadData());
462
463         expectBatchedModificationsReady(actorRef);
464
465         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
466
467         transactionProxy.read(TestModel.TEST_PATH);
468
469         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
470
471         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
472
473         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
474
475         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
476
477         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
478
479         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
480         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
481
482         verifyBatchedModifications(batchedModifications.get(0), true,
483                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
484
485         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
486     }
487
488     @Test
489     public void testReadyWithNoModifications() throws Exception {
490         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
491
492         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
493                 eq(actorSelection(actorRef)), eqSerializedReadData());
494
495         expectBatchedModificationsReady(actorRef);
496
497         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
498
499         transactionProxy.read(TestModel.TEST_PATH);
500
501         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
502
503         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
504
505         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
506
507         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
508
509         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
510         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
511
512         verifyBatchedModifications(batchedModifications.get(0), true);
513     }
514
515     @Test
516     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
517         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
518
519         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
520
521         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
522
523         expectBatchedModificationsReady(actorRef);
524
525         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
526
527         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
528
529         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
530
531         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
532
533         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
534
535         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
536
537         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
538         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
539
540         verifyBatchedModifications(batchedModifications.get(0), true,
541                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
542
543         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
544                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
545     }
546
547     @Test
548     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
549         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
550         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
551
552         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
553
554         expectBatchedModificationsReady(actorRef);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559
560         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
561
562         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
563
564         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
565
566         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
567
568         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
569         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
570
571         verifyBatchedModifications(batchedModifications.get(0), false,
572                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
573
574         verifyBatchedModifications(batchedModifications.get(1), true);
575
576         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
577                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
578     }
579
580     @Test
581     public void testReadyWithReplyFailure() throws Exception {
582         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
583
584         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
585
586         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
587
588         expectFailedBatchedModifications(actorRef);
589
590         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
591
592         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
593
594         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
595
596         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
597
598         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
599
600         verifyCohortFutures(proxy, TestException.class);
601     }
602
603     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
604         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
605
606         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
607
608         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
609
610         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
611
612         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
613
614         transactionProxy.delete(TestModel.TEST_PATH);
615
616         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
617
618         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
619
620         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
621
622         verifyCohortFutures(proxy, toThrow.getClass());
623     }
624
625     @Test
626     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
627         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
628     }
629
630     @Test
631     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
632         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
633     }
634
635     @Test
636     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
637         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
638     }
639
640     @Test
641     public void testReadyWithInvalidReplyMessageType() throws Exception {
642         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
643         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
644
645         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
646
647         //expectBatchedModifications(actorRef, 1);
648
649         doReturn(Futures.successful(new Object())).when(mockActorContext).
650                 executeOperationAsync(eq(actorSelection(actorRef)),
651                         isA(BatchedModifications.class));
652
653         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
654
655         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
656
657         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
658
659         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
660
661         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
662
663         verifyCohortFutures(proxy, IllegalArgumentException.class);
664     }
665
666     @Test
667     public void testGetIdentifier() {
668         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
669         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
670                 TransactionProxy.TransactionType.READ_ONLY);
671
672         Object id = transactionProxy.getIdentifier();
673         assertNotNull("getIdentifier returned null", id);
674         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
675     }
676
677     @Test
678     public void testClose() throws Exception{
679         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
680
681         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
682                 eq(actorSelection(actorRef)), eqSerializedReadData());
683
684         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
685
686         transactionProxy.read(TestModel.TEST_PATH);
687
688         transactionProxy.close();
689
690         verify(mockActorContext).sendOperationAsync(
691                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
692     }
693
694
695     /**
696      * Method to test a local Tx actor. The Tx paths are matched to decide if the
697      * Tx actor is local or not. This is done by mocking the Tx actor path
698      * and the caller paths and ensuring that the paths have the remote-address format
699      *
700      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
701      * the paths returned for the actors for all the tests are not qualified remote paths.
702      * Hence are treated as non-local/remote actors. In short, all tests except
703      * few below run for remote actors
704      *
705      * @throws Exception
706      */
707     @Test
708     public void testLocalTxActorRead() throws Exception {
709         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
710         doReturn(true).when(mockActorContext).isPathLocal(anyString());
711
712         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
713
714         // negative test case with null as the reply
715         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
716             any(ActorSelection.class), eqReadData());
717
718         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
719             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
720
721         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
722
723         // test case with node as read data reply
724         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
725
726         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
727             any(ActorSelection.class), eqReadData());
728
729         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
730
731         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
732
733         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
734
735         // test for local data exists
736         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
737             any(ActorSelection.class), eqDataExists());
738
739         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
740
741         assertEquals("Exists response", true, exists);
742     }
743
744     @Test
745     public void testLocalTxActorReady() throws Exception {
746         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
747         doReturn(true).when(mockActorContext).isPathLocal(anyString());
748
749         expectBatchedModificationsReady(actorRef);
750
751         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
752
753         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
754         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
755
756         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
757
758         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
759
760         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
761
762         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
763     }
764
765     private static interface TransactionProxyOperation {
766         void run(TransactionProxy transactionProxy);
767     }
768
769     private void throttleOperation(TransactionProxyOperation operation) {
770         throttleOperation(operation, 1, true);
771     }
772
773     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
774         ActorSystem actorSystem = getSystem();
775         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
776
777         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
778
779         doReturn(actorSystem.actorSelection(shardActorRef.path())).
780                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
781
782         if(shardFound) {
783             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
784                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
785         } else {
786             doReturn(Futures.failed(new Exception("not found")))
787                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
788         }
789
790         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
791         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
792                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
793                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
794
795         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
796                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
797                         eqCreateTransaction(memberName, READ_WRITE));
798
799         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
800
801         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
802
803         long start = System.nanoTime();
804
805         operation.run(transactionProxy);
806
807         long end = System.nanoTime();
808
809         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
810         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
811                 expected, (end-start)), (end - start) > expected);
812
813     }
814
815     private void completeOperation(TransactionProxyOperation operation){
816         completeOperation(operation, true);
817     }
818
819     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
820         ActorSystem actorSystem = getSystem();
821         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
822
823         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
824
825         doReturn(actorSystem.actorSelection(shardActorRef.path())).
826                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
827
828         if(shardFound) {
829             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
830                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
831         } else {
832             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
833                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
834         }
835
836         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
837         String actorPath = txActorRef.path().toString();
838         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
839                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
840                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
841
842         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
843
844         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
845                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
846                         eqCreateTransaction(memberName, READ_WRITE));
847
848         doReturn(true).when(mockActorContext).isPathLocal(anyString());
849
850         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
851
852         long start = System.nanoTime();
853
854         operation.run(transactionProxy);
855
856         long end = System.nanoTime();
857
858         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
859         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
860                 expected, (end-start)), (end - start) <= expected);
861     }
862
863     public void testWriteThrottling(boolean shardFound){
864
865         throttleOperation(new TransactionProxyOperation() {
866             @Override
867             public void run(TransactionProxy transactionProxy) {
868                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
869
870                 expectBatchedModifications(2);
871
872                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
873
874                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
875             }
876         }, 1, shardFound);
877     }
878
879     @Test
880     public void testWriteThrottlingWhenShardFound(){
881         dataStoreContextBuilder.shardBatchedModificationCount(1);
882         throttleOperation(new TransactionProxyOperation() {
883             @Override
884             public void run(TransactionProxy transactionProxy) {
885                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
886
887                 expectIncompleteBatchedModifications();
888
889                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
890
891                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
892             }
893         });
894     }
895
896     @Test
897     public void testWriteThrottlingWhenShardNotFound(){
898         // Confirm that there is no throttling when the Shard is not found
899         dataStoreContextBuilder.shardBatchedModificationCount(1);
900         completeOperation(new TransactionProxyOperation() {
901             @Override
902             public void run(TransactionProxy transactionProxy) {
903                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
904
905                 expectBatchedModifications(2);
906
907                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
908
909                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
910             }
911         }, false);
912
913     }
914
915
916     @Test
917     public void testWriteCompletion(){
918         dataStoreContextBuilder.shardBatchedModificationCount(1);
919         completeOperation(new TransactionProxyOperation() {
920             @Override
921             public void run(TransactionProxy transactionProxy) {
922                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
923
924                 expectBatchedModifications(2);
925
926                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
927
928                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
929             }
930         });
931     }
932
933     @Test
934     public void testMergeThrottlingWhenShardFound(){
935         dataStoreContextBuilder.shardBatchedModificationCount(1);
936         throttleOperation(new TransactionProxyOperation() {
937             @Override
938             public void run(TransactionProxy transactionProxy) {
939                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
940
941                 expectIncompleteBatchedModifications();
942
943                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
944
945                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
946             }
947         });
948     }
949
950     @Test
951     public void testMergeThrottlingWhenShardNotFound(){
952         dataStoreContextBuilder.shardBatchedModificationCount(1);
953         completeOperation(new TransactionProxyOperation() {
954             @Override
955             public void run(TransactionProxy transactionProxy) {
956                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
957
958                 expectBatchedModifications(2);
959
960                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
961
962                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
963             }
964         }, false);
965     }
966
967     @Test
968     public void testMergeCompletion(){
969         dataStoreContextBuilder.shardBatchedModificationCount(1);
970         completeOperation(new TransactionProxyOperation() {
971             @Override
972             public void run(TransactionProxy transactionProxy) {
973                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
974
975                 expectBatchedModifications(2);
976
977                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
978
979                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
980             }
981         });
982
983     }
984
985     @Test
986     public void testDeleteThrottlingWhenShardFound(){
987
988         throttleOperation(new TransactionProxyOperation() {
989             @Override
990             public void run(TransactionProxy transactionProxy) {
991                 expectIncompleteBatchedModifications();
992
993                 transactionProxy.delete(TestModel.TEST_PATH);
994
995                 transactionProxy.delete(TestModel.TEST_PATH);
996             }
997         });
998     }
999
1000
1001     @Test
1002     public void testDeleteThrottlingWhenShardNotFound(){
1003
1004         completeOperation(new TransactionProxyOperation() {
1005             @Override
1006             public void run(TransactionProxy transactionProxy) {
1007                 expectBatchedModifications(2);
1008
1009                 transactionProxy.delete(TestModel.TEST_PATH);
1010
1011                 transactionProxy.delete(TestModel.TEST_PATH);
1012             }
1013         }, false);
1014     }
1015
1016     @Test
1017     public void testDeleteCompletion(){
1018         dataStoreContextBuilder.shardBatchedModificationCount(1);
1019         completeOperation(new TransactionProxyOperation() {
1020             @Override
1021             public void run(TransactionProxy transactionProxy) {
1022                 expectBatchedModifications(2);
1023
1024                 transactionProxy.delete(TestModel.TEST_PATH);
1025
1026                 transactionProxy.delete(TestModel.TEST_PATH);
1027             }
1028         });
1029
1030     }
1031
1032     @Test
1033     public void testReadThrottlingWhenShardFound(){
1034
1035         throttleOperation(new TransactionProxyOperation() {
1036             @Override
1037             public void run(TransactionProxy transactionProxy) {
1038                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1039                         any(ActorSelection.class), eqReadData());
1040
1041                 transactionProxy.read(TestModel.TEST_PATH);
1042
1043                 transactionProxy.read(TestModel.TEST_PATH);
1044             }
1045         });
1046     }
1047
1048     @Test
1049     public void testReadThrottlingWhenShardNotFound(){
1050
1051         completeOperation(new TransactionProxyOperation() {
1052             @Override
1053             public void run(TransactionProxy transactionProxy) {
1054                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1055                         any(ActorSelection.class), eqReadData());
1056
1057                 transactionProxy.read(TestModel.TEST_PATH);
1058
1059                 transactionProxy.read(TestModel.TEST_PATH);
1060             }
1061         }, false);
1062     }
1063
1064
1065     @Test
1066     public void testReadCompletion(){
1067         completeOperation(new TransactionProxyOperation() {
1068             @Override
1069             public void run(TransactionProxy transactionProxy) {
1070                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1071
1072                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1073                         any(ActorSelection.class), eqReadData());
1074
1075                 transactionProxy.read(TestModel.TEST_PATH);
1076
1077                 transactionProxy.read(TestModel.TEST_PATH);
1078             }
1079         });
1080
1081     }
1082
1083     @Test
1084     public void testExistsThrottlingWhenShardFound(){
1085
1086         throttleOperation(new TransactionProxyOperation() {
1087             @Override
1088             public void run(TransactionProxy transactionProxy) {
1089                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1090                         any(ActorSelection.class), eqDataExists());
1091
1092                 transactionProxy.exists(TestModel.TEST_PATH);
1093
1094                 transactionProxy.exists(TestModel.TEST_PATH);
1095             }
1096         });
1097     }
1098
1099     @Test
1100     public void testExistsThrottlingWhenShardNotFound(){
1101
1102         completeOperation(new TransactionProxyOperation() {
1103             @Override
1104             public void run(TransactionProxy transactionProxy) {
1105                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1106                         any(ActorSelection.class), eqDataExists());
1107
1108                 transactionProxy.exists(TestModel.TEST_PATH);
1109
1110                 transactionProxy.exists(TestModel.TEST_PATH);
1111             }
1112         }, false);
1113     }
1114
1115
1116     @Test
1117     public void testExistsCompletion(){
1118         completeOperation(new TransactionProxyOperation() {
1119             @Override
1120             public void run(TransactionProxy transactionProxy) {
1121                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1122                         any(ActorSelection.class), eqDataExists());
1123
1124                 transactionProxy.exists(TestModel.TEST_PATH);
1125
1126                 transactionProxy.exists(TestModel.TEST_PATH);
1127             }
1128         });
1129
1130     }
1131
1132     @Test
1133     public void testReadyThrottling(){
1134
1135         throttleOperation(new TransactionProxyOperation() {
1136             @Override
1137             public void run(TransactionProxy transactionProxy) {
1138                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1139
1140                 expectBatchedModifications(1);
1141
1142                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1143                         any(ActorSelection.class), any(ReadyTransaction.class));
1144
1145                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1146
1147                 transactionProxy.ready();
1148             }
1149         });
1150     }
1151
1152     @Test
1153     public void testReadyThrottlingWithTwoTransactionContexts(){
1154
1155         throttleOperation(new TransactionProxyOperation() {
1156             @Override
1157             public void run(TransactionProxy transactionProxy) {
1158                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1159                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1160
1161                 expectBatchedModifications(2);
1162
1163                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1164                         any(ActorSelection.class), any(ReadyTransaction.class));
1165
1166                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1167
1168                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1169
1170                 transactionProxy.ready();
1171             }
1172         }, 2, true);
1173     }
1174
1175     private void testModificationOperationBatching(TransactionType type) throws Exception {
1176         int shardBatchedModificationCount = 3;
1177         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1178
1179         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1180
1181         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1182
1183         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1184         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1185
1186         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1187         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1188
1189         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1190         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1191
1192         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1193         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1194
1195         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1196         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1197
1198         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1199         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1200
1201         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1202         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1203
1204         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1205
1206         transactionProxy.write(writePath1, writeNode1);
1207         transactionProxy.write(writePath2, writeNode2);
1208         transactionProxy.delete(deletePath1);
1209         transactionProxy.merge(mergePath1, mergeNode1);
1210         transactionProxy.merge(mergePath2, mergeNode2);
1211         transactionProxy.write(writePath3, writeNode3);
1212         transactionProxy.merge(mergePath3, mergeNode3);
1213         transactionProxy.delete(deletePath2);
1214
1215         // This sends the last batch.
1216         transactionProxy.ready();
1217
1218         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1219         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1220
1221         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1222                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1223
1224         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1225                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1226
1227         verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
1228                 new DeleteModification(deletePath2));
1229
1230         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1231     }
1232
1233     @Test
1234     public void testReadWriteModificationOperationBatching() throws Throwable {
1235         testModificationOperationBatching(READ_WRITE);
1236     }
1237
1238     @Test
1239     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1240         testModificationOperationBatching(WRITE_ONLY);
1241     }
1242
1243     @Test
1244     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1245         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1246         testModificationOperationBatching(WRITE_ONLY);
1247     }
1248
1249     @Test
1250     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1251
1252         int shardBatchedModificationCount = 10;
1253         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1254
1255         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1256
1257         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1258
1259         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1260         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1261
1262         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1263         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1264
1265         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1266         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1267
1268         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1269         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1270
1271         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1272
1273         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1274                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1275
1276         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1277                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1278
1279         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1280                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1281
1282         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1283
1284         transactionProxy.write(writePath1, writeNode1);
1285         transactionProxy.write(writePath2, writeNode2);
1286
1287         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1288                 get(5, TimeUnit.SECONDS);
1289
1290         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1291         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1292
1293         transactionProxy.merge(mergePath1, mergeNode1);
1294         transactionProxy.merge(mergePath2, mergeNode2);
1295
1296         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1297
1298         transactionProxy.delete(deletePath);
1299
1300         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1301         assertEquals("Exists response", true, exists);
1302
1303         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1304         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1305
1306         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1307         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1308
1309         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1310                 new WriteModification(writePath2, writeNode2));
1311
1312         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1313                 new MergeModification(mergePath2, mergeNode2));
1314
1315         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1316
1317         InOrder inOrder = Mockito.inOrder(mockActorContext);
1318         inOrder.verify(mockActorContext).executeOperationAsync(
1319                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1320
1321         inOrder.verify(mockActorContext).executeOperationAsync(
1322                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1323
1324         inOrder.verify(mockActorContext).executeOperationAsync(
1325                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1326
1327         inOrder.verify(mockActorContext).executeOperationAsync(
1328                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1329
1330         inOrder.verify(mockActorContext).executeOperationAsync(
1331                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1332
1333         inOrder.verify(mockActorContext).executeOperationAsync(
1334                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1335     }
1336
1337     @Test
1338     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1339
1340         SchemaContext schemaContext = SchemaContextHelper.full();
1341         Configuration configuration = mock(Configuration.class);
1342         doReturn(configuration).when(mockActorContext).getConfiguration();
1343         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1344         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1345
1346         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1347         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1348
1349         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1350         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1351
1352         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1353
1354         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
1355
1356         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1357
1358         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
1359
1360         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1361                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1362
1363         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1364
1365         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1366
1367         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1368
1369         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1370
1371         for(NormalizedNode<?,?> node : collection){
1372             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1373         }
1374
1375         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1376                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1377
1378         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1379
1380         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1381                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1382
1383         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1384     }
1385
1386
1387     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1388         ActorSystem actorSystem = getSystem();
1389         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1390
1391         doReturn(getSystem().actorSelection(shardActorRef.path())).
1392                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1393
1394         doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
1395                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1396
1397         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1398
1399         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1400
1401         doReturn(actorSystem.actorSelection(txActorRef.path())).
1402                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1403
1404         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1405                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1406                         eqCreateTransaction(memberName, TransactionType.READ_ONLY));
1407
1408         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1409                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
1410     }
1411 }