Deprecate ReadData/DataExists protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.ActorSystem;
28 import akka.actor.Props;
29 import akka.dispatch.Futures;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Sets;
33 import com.google.common.util.concurrent.CheckedFuture;
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
51 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
52 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
57 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
58 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
59 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
60 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
61 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
62 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
63 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
64 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
65 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
73 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Promise;
76
77 @SuppressWarnings("resource")
78 public class TransactionProxyTest extends AbstractTransactionProxyTest {
79
80     @SuppressWarnings("serial")
81     static class TestException extends RuntimeException {
82     }
83
84     static interface Invoker {
85         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
86     }
87
88     @Test
89     public void testRead() throws Exception {
90         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
91
92         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
93
94         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
95                 eq(actorSelection(actorRef)), eqReadData());
96
97         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
98                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
99
100         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
101
102         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
103
104         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
105                 eq(actorSelection(actorRef)), eqReadData());
106
107         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108
109         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
110
111         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
112     }
113
114     @Test(expected = ReadFailedException.class)
115     public void testReadWithInvalidReplyMessageType() throws Exception {
116         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
117
118         doReturn(Futures.successful(new Object())).when(mockActorContext).
119                 executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
120
121         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
122
123         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
124     }
125
126     @Test(expected = TestException.class)
127     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
128         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
129
130         doReturn(Futures.failed(new TestException())).when(mockActorContext).
131                 executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
132
133         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
134
135         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
136     }
137
138     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
139             throws Throwable {
140         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
141
142         if (exToThrow instanceof PrimaryNotFoundException) {
143             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
144         } else {
145             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
146                     when(mockActorContext).findPrimaryShardAsync(anyString());
147         }
148
149         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
150                 any(ActorSelection.class), any(), any(Timeout.class));
151
152         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
153
154         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
155     }
156
157     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
158         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
159             @Override
160             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
161                 return proxy.read(TestModel.TEST_PATH);
162             }
163         });
164     }
165
166     @Test(expected = PrimaryNotFoundException.class)
167     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
168         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
169     }
170
171     @Test(expected = TimeoutException.class)
172     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
173         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
174                 new Exception("reason")));
175     }
176
177     @Test(expected = TestException.class)
178     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
179         testReadWithExceptionOnInitialCreateTransaction(new TestException());
180     }
181
182     @Test
183     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
184         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
185
186         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
187
188         expectBatchedModifications(actorRef, 1);
189
190         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
191                 eq(actorSelection(actorRef)), eqReadData());
192
193         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
194
195         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
196
197         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
198                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
199
200         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
201         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
202
203         InOrder inOrder = Mockito.inOrder(mockActorContext);
204         inOrder.verify(mockActorContext).executeOperationAsync(
205                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
206
207         inOrder.verify(mockActorContext).executeOperationAsync(
208                 eq(actorSelection(actorRef)), eqReadData());
209     }
210
211     @Test(expected=IllegalStateException.class)
212     public void testReadPreConditionCheck() {
213         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
214         transactionProxy.read(TestModel.TEST_PATH);
215     }
216
217     @Test(expected=IllegalArgumentException.class)
218     public void testInvalidCreateTransactionReply() throws Throwable {
219         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
220
221         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
222             actorSelection(actorRef.path().toString());
223
224         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
225             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
226
227         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
228             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
229             any(Timeout.class));
230
231         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
232
233         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
234     }
235
236     @Test
237     public void testExists() throws Exception {
238         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
239
240         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
241
242         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
243                 eq(actorSelection(actorRef)), eqDataExists());
244
245         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
246
247         assertEquals("Exists response", false, exists);
248
249         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
250                 eq(actorSelection(actorRef)), eqDataExists());
251
252         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
253
254         assertEquals("Exists response", true, exists);
255     }
256
257     @Test(expected = PrimaryNotFoundException.class)
258     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
259         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
260             @Override
261             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
262                 return proxy.exists(TestModel.TEST_PATH);
263             }
264         });
265     }
266
267     @Test(expected = ReadFailedException.class)
268     public void testExistsWithInvalidReplyMessageType() throws Exception {
269         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
270
271         doReturn(Futures.successful(new Object())).when(mockActorContext).
272                 executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
273
274         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
275
276         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
277     }
278
279     @Test(expected = TestException.class)
280     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
281         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
282
283         doReturn(Futures.failed(new TestException())).when(mockActorContext).
284                 executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
285
286         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
287
288         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
289     }
290
291     @Test
292     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
293         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
294
295         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296
297         expectBatchedModifications(actorRef, 1);
298
299         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
300                 eq(actorSelection(actorRef)), eqDataExists());
301
302         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
303
304         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
305
306         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
307
308         assertEquals("Exists response", true, exists);
309
310         InOrder inOrder = Mockito.inOrder(mockActorContext);
311         inOrder.verify(mockActorContext).executeOperationAsync(
312                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
313
314         inOrder.verify(mockActorContext).executeOperationAsync(
315                 eq(actorSelection(actorRef)), eqDataExists());
316     }
317
318     @Test(expected=IllegalStateException.class)
319     public void testExistsPreConditionCheck() {
320         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
321         transactionProxy.exists(TestModel.TEST_PATH);
322     }
323
324     @Test
325     public void testWrite() throws Exception {
326         dataStoreContextBuilder.shardBatchedModificationCount(1);
327         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
328
329         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
330
331         expectBatchedModifications(actorRef, 1);
332
333         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
334
335         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
336
337         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
338     }
339
340     @Test
341     public void testWriteAfterAsyncRead() throws Throwable {
342         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
343
344         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
345         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
346                 eq(getSystem().actorSelection(actorRef.path())),
347                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
348
349         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
350                 eq(actorSelection(actorRef)), eqReadData());
351
352         expectBatchedModificationsReady(actorRef);
353
354         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
355
356         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
357
358         final CountDownLatch readComplete = new CountDownLatch(1);
359         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
360         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
361                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
362                     @Override
363                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
364                         try {
365                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
366                         } catch (Exception e) {
367                             caughtEx.set(e);
368                         } finally {
369                             readComplete.countDown();
370                         }
371                     }
372
373                     @Override
374                     public void onFailure(Throwable t) {
375                         caughtEx.set(t);
376                         readComplete.countDown();
377                     }
378                 });
379
380         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
381
382         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
383
384         if(caughtEx.get() != null) {
385             throw caughtEx.get();
386         }
387
388         // This sends the batched modification.
389         transactionProxy.ready();
390
391         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
392     }
393
394     @Test(expected=IllegalStateException.class)
395     public void testWritePreConditionCheck() {
396         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
397         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
398     }
399
400     @Test(expected=IllegalStateException.class)
401     public void testWriteAfterReadyPreConditionCheck() {
402         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
403
404         transactionProxy.ready();
405
406         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
407     }
408
409     @Test
410     public void testMerge() throws Exception {
411         dataStoreContextBuilder.shardBatchedModificationCount(1);
412         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
413
414         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
415
416         expectBatchedModifications(actorRef, 1);
417
418         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
419
420         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
421
422         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
423     }
424
425     @Test
426     public void testDelete() throws Exception {
427         dataStoreContextBuilder.shardBatchedModificationCount(1);
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
429
430         expectBatchedModifications(actorRef, 1);
431
432         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
433
434         transactionProxy.delete(TestModel.TEST_PATH);
435
436         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
437     }
438
439     @Test
440     public void testReadWrite() throws Exception {
441         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
442
443         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
444
445         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
446                 eq(actorSelection(actorRef)), eqReadData());
447
448         expectBatchedModifications(actorRef, 1);
449
450         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
451
452         transactionProxy.read(TestModel.TEST_PATH);
453
454         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
455
456         transactionProxy.read(TestModel.TEST_PATH);
457
458         transactionProxy.read(TestModel.TEST_PATH);
459
460         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
461         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
462
463         verifyBatchedModifications(batchedModifications.get(0), false,
464                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
465     }
466
467     @Test
468     public void testReadyWithReadWrite() throws Exception {
469         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
470
471         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472
473         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
474                 eq(actorSelection(actorRef)), eqReadData());
475
476         expectBatchedModificationsReady(actorRef, true);
477
478         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
479
480         transactionProxy.read(TestModel.TEST_PATH);
481
482         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
483
484         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
485
486         assertTrue(ready instanceof SingleCommitCohortProxy);
487
488         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
489
490         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
491         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
492
493         verifyBatchedModifications(batchedModifications.get(0), true, true,
494                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
495
496         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
497     }
498
499     @Test
500     public void testReadyWithNoModifications() throws Exception {
501         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
502
503         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
504                 eq(actorSelection(actorRef)), eqReadData());
505
506         expectBatchedModificationsReady(actorRef, true);
507
508         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
509
510         transactionProxy.read(TestModel.TEST_PATH);
511
512         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
513
514         assertTrue(ready instanceof SingleCommitCohortProxy);
515
516         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
517
518         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
519         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
520
521         verifyBatchedModifications(batchedModifications.get(0), true, true);
522     }
523
524     @Test
525     public void testReadyWithMultipleShardWrites() throws Exception {
526         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
527
528         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
529
530         expectBatchedModificationsReady(actorRef1);
531         expectBatchedModificationsReady(actorRef2);
532
533         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
534
535         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
536         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
537
538         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
539
540         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
541
542         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
543                 actorSelection(actorRef2));
544     }
545
546     @Test
547     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
548         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
549
550         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
551
552         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
553
554         expectBatchedModificationsReady(actorRef, true);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559
560         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
561
562         assertTrue(ready instanceof SingleCommitCohortProxy);
563
564         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
565
566         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
567         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
568
569         verifyBatchedModifications(batchedModifications.get(0), true, true,
570                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
571     }
572
573     @Test
574     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
575         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
576         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
577
578         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
579
580         expectBatchedModificationsReady(actorRef, true);
581
582         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
583
584         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
585
586         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
587
588         assertTrue(ready instanceof SingleCommitCohortProxy);
589
590         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
591
592         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
593         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
594
595         verifyBatchedModifications(batchedModifications.get(0), false,
596                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
597
598         verifyBatchedModifications(batchedModifications.get(1), true, true);
599     }
600
601     @Test
602     public void testReadyWithReplyFailure() throws Exception {
603         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
604
605         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
606
607         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
608
609         expectFailedBatchedModifications(actorRef);
610
611         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
612
613         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
614
615         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
616
617         assertTrue(ready instanceof SingleCommitCohortProxy);
618
619         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
620     }
621
622     @Test
623     public void testReadyWithDebugContextEnabled() throws Exception {
624         dataStoreContextBuilder.transactionDebugContextEnabled(true);
625
626         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
627
628         expectBatchedModificationsReady(actorRef, true);
629
630         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
631
632         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
633
634         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
635
636         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
637
638         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
639     }
640
641     @Test
642     public void testReadyWithLocalTransaction() throws Exception {
643         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
644
645         doReturn(getSystem().actorSelection(shardActorRef.path())).
646                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
647
648         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).
649                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
650
651         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
652
653         expectReadyLocalTransaction(shardActorRef, true);
654
655         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
656         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
657
658         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
659         assertTrue(ready instanceof SingleCommitCohortProxy);
660         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
661     }
662
663     @Test
664     public void testReadyWithLocalTransactionWithFailure() throws Exception {
665         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
666
667         doReturn(getSystem().actorSelection(shardActorRef.path())).
668                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
669
670         Optional<DataTree> mockDataTree = createDataTree();
671         DataTreeModification mockModification = mockDataTree.get().takeSnapshot().newModification();
672         doThrow(new RuntimeException("mock")).when(mockModification).ready();
673
674         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).
675                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
676
677         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
678
679         expectReadyLocalTransaction(shardActorRef, true);
680
681         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
682         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
683
684         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
685         assertTrue(ready instanceof SingleCommitCohortProxy);
686         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
687     }
688
689     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
690         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
691
692         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
693
694         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
695
696         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
697
698         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
699
700         transactionProxy.delete(TestModel.TEST_PATH);
701
702         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
703
704         assertTrue(ready instanceof SingleCommitCohortProxy);
705
706         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
707     }
708
709     @Test
710     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
711         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
712     }
713
714     @Test
715     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
716         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
717     }
718
719     @Test
720     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
721         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
722     }
723
724     @Test
725     public void testReadyWithInvalidReplyMessageType() throws Exception {
726         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
727         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
728
729         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
730
731         doReturn(Futures.successful(new Object())).when(mockActorContext).
732                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
733
734         expectBatchedModificationsReady(actorRef2);
735
736         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
737
738         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
739         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
740
741         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
742
743         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
744
745         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
746                 IllegalArgumentException.class);
747     }
748
749     @Test
750     public void testGetIdentifier() {
751         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
752         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
753
754         Object id = transactionProxy.getIdentifier();
755         assertNotNull("getIdentifier returned null", id);
756         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
757     }
758
759     @Test
760     public void testClose() throws Exception{
761         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
762
763         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
764                 eq(actorSelection(actorRef)), eqReadData());
765
766         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
767
768         transactionProxy.read(TestModel.TEST_PATH);
769
770         transactionProxy.close();
771
772         verify(mockActorContext).sendOperationAsync(
773                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
774     }
775
776     private static interface TransactionProxyOperation {
777         void run(TransactionProxy transactionProxy);
778     }
779
780     private void throttleOperation(TransactionProxyOperation operation) {
781         throttleOperation(operation, 1, true);
782     }
783
784     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
785         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
786                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
787     }
788
789     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
790         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
791                 Optional.<DataTree>absent());
792     }
793
794     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
795         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
796                 dataTreeOptional);
797     }
798
799
800     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
801         ActorSystem actorSystem = getSystem();
802         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
803
804         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
805         // we now allow one extra permit to be allowed for ready
806         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
807                 shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
808
809         doReturn(actorSystem.actorSelection(shardActorRef.path())).
810                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
811
812         if(shardFound) {
813             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
814                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
815             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
816                     when(mockActorContext).findPrimaryShardAsync(eq("cars"));
817
818         } else {
819             doReturn(Futures.failed(new Exception("not found")))
820                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
821         }
822
823         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
824
825         doReturn(incompleteFuture()).when(mockActorContext).
826         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
827                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
828
829         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
830
831         long start = System.nanoTime();
832
833         operation.run(transactionProxy);
834
835         long end = System.nanoTime();
836
837         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
838                 expectedCompletionTime, (end-start)),
839                 ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
840
841     }
842
843     private void completeOperation(TransactionProxyOperation operation){
844         completeOperation(operation, true);
845     }
846
847     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
848         ActorSystem actorSystem = getSystem();
849         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
850
851         doReturn(actorSystem.actorSelection(shardActorRef.path())).
852                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
853
854         if(shardFound) {
855             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
856                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
857         } else {
858             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
859                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
860         }
861
862         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
863         String actorPath = txActorRef.path().toString();
864         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, "txn-1",
865                 DataStoreVersions.CURRENT_VERSION);
866
867         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
868
869         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
870                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
871                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
872
873         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
874
875         long start = System.nanoTime();
876
877         operation.run(transactionProxy);
878
879         long end = System.nanoTime();
880
881         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
882         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
883                 expected, (end-start)), (end - start) <= expected);
884     }
885
886     private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
887         ActorSystem actorSystem = getSystem();
888         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
889
890         doReturn(actorSystem.actorSelection(shardActorRef.path())).
891                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
892
893         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
894                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
895
896         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
897
898         long start = System.nanoTime();
899
900         operation.run(transactionProxy);
901
902         long end = System.nanoTime();
903
904         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
905         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
906                 expected, (end-start)), (end - start) <= expected);
907     }
908
909     private static Optional<DataTree> createDataTree(){
910         DataTree dataTree = mock(DataTree.class);
911         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
912         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
913         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
914
915         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
916         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
917
918         return dataTreeOptional;
919     }
920
921     private static Optional<DataTree> createDataTree(NormalizedNode<?, ?> readResponse){
922         DataTree dataTree = mock(DataTree.class);
923         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
924         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
925         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
926
927         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
928         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
929         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
930
931         return dataTreeOptional;
932     }
933
934
935     @Test
936     public void testWriteCompletionForLocalShard(){
937         completeOperationLocal(new TransactionProxyOperation() {
938             @Override
939             public void run(TransactionProxy transactionProxy) {
940                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
941
942                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
943
944                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
945
946             }
947         }, createDataTree());
948     }
949
950     @Test
951     public void testWriteThrottlingWhenShardFound(){
952         throttleOperation(new TransactionProxyOperation() {
953             @Override
954             public void run(TransactionProxy transactionProxy) {
955                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
956
957                 expectIncompleteBatchedModifications();
958
959                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
960
961                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
962             }
963         });
964     }
965
966     @Test
967     public void testWriteThrottlingWhenShardNotFound(){
968         // Confirm that there is no throttling when the Shard is not found
969         completeOperation(new TransactionProxyOperation() {
970             @Override
971             public void run(TransactionProxy transactionProxy) {
972                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
973
974                 expectBatchedModifications(2);
975
976                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
977
978                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
979             }
980         }, false);
981
982     }
983
984
985     @Test
986     public void testWriteCompletion(){
987         completeOperation(new TransactionProxyOperation() {
988             @Override
989             public void run(TransactionProxy transactionProxy) {
990                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
991
992                 expectBatchedModifications(2);
993
994                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
995
996                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
997             }
998         });
999     }
1000
1001     @Test
1002     public void testMergeThrottlingWhenShardFound(){
1003         throttleOperation(new TransactionProxyOperation() {
1004             @Override
1005             public void run(TransactionProxy transactionProxy) {
1006                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1007
1008                 expectIncompleteBatchedModifications();
1009
1010                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1011
1012                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1013             }
1014         });
1015     }
1016
1017     @Test
1018     public void testMergeThrottlingWhenShardNotFound(){
1019         completeOperation(new TransactionProxyOperation() {
1020             @Override
1021             public void run(TransactionProxy transactionProxy) {
1022                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1023
1024                 expectBatchedModifications(2);
1025
1026                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1027
1028                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1029             }
1030         }, false);
1031     }
1032
1033     @Test
1034     public void testMergeCompletion(){
1035         completeOperation(new TransactionProxyOperation() {
1036             @Override
1037             public void run(TransactionProxy transactionProxy) {
1038                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1039
1040                 expectBatchedModifications(2);
1041
1042                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1043
1044                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1045             }
1046         });
1047
1048     }
1049
1050     @Test
1051     public void testMergeCompletionForLocalShard(){
1052         completeOperationLocal(new TransactionProxyOperation() {
1053             @Override
1054             public void run(TransactionProxy transactionProxy) {
1055                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1056
1057                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1058
1059                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1060
1061             }
1062         }, createDataTree());
1063     }
1064
1065
1066     @Test
1067     public void testDeleteThrottlingWhenShardFound(){
1068
1069         throttleOperation(new TransactionProxyOperation() {
1070             @Override
1071             public void run(TransactionProxy transactionProxy) {
1072                 expectIncompleteBatchedModifications();
1073
1074                 transactionProxy.delete(TestModel.TEST_PATH);
1075
1076                 transactionProxy.delete(TestModel.TEST_PATH);
1077             }
1078         });
1079     }
1080
1081
1082     @Test
1083     public void testDeleteThrottlingWhenShardNotFound(){
1084
1085         completeOperation(new TransactionProxyOperation() {
1086             @Override
1087             public void run(TransactionProxy transactionProxy) {
1088                 expectBatchedModifications(2);
1089
1090                 transactionProxy.delete(TestModel.TEST_PATH);
1091
1092                 transactionProxy.delete(TestModel.TEST_PATH);
1093             }
1094         }, false);
1095     }
1096
1097     @Test
1098     public void testDeleteCompletionForLocalShard(){
1099         completeOperationLocal(new TransactionProxyOperation() {
1100             @Override
1101             public void run(TransactionProxy transactionProxy) {
1102
1103                 transactionProxy.delete(TestModel.TEST_PATH);
1104
1105                 transactionProxy.delete(TestModel.TEST_PATH);
1106             }
1107         }, createDataTree());
1108
1109     }
1110
1111     @Test
1112     public void testDeleteCompletion(){
1113         completeOperation(new TransactionProxyOperation() {
1114             @Override
1115             public void run(TransactionProxy transactionProxy) {
1116                 expectBatchedModifications(2);
1117
1118                 transactionProxy.delete(TestModel.TEST_PATH);
1119
1120                 transactionProxy.delete(TestModel.TEST_PATH);
1121             }
1122         });
1123
1124     }
1125
1126     @Test
1127     public void testReadThrottlingWhenShardFound(){
1128
1129         throttleOperation(new TransactionProxyOperation() {
1130             @Override
1131             public void run(TransactionProxy transactionProxy) {
1132                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1133                         any(ActorSelection.class), eqReadData());
1134
1135                 transactionProxy.read(TestModel.TEST_PATH);
1136
1137                 transactionProxy.read(TestModel.TEST_PATH);
1138             }
1139         });
1140     }
1141
1142     @Test
1143     public void testReadThrottlingWhenShardNotFound(){
1144
1145         completeOperation(new TransactionProxyOperation() {
1146             @Override
1147             public void run(TransactionProxy transactionProxy) {
1148                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1149                         any(ActorSelection.class), eqReadData());
1150
1151                 transactionProxy.read(TestModel.TEST_PATH);
1152
1153                 transactionProxy.read(TestModel.TEST_PATH);
1154             }
1155         }, false);
1156     }
1157
1158
1159     @Test
1160     public void testReadCompletion(){
1161         completeOperation(new TransactionProxyOperation() {
1162             @Override
1163             public void run(TransactionProxy transactionProxy) {
1164                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1165
1166                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1167                         any(ActorSelection.class), eqReadData());
1168
1169                 transactionProxy.read(TestModel.TEST_PATH);
1170
1171                 transactionProxy.read(TestModel.TEST_PATH);
1172             }
1173         });
1174
1175     }
1176
1177     @Test
1178     public void testReadCompletionForLocalShard(){
1179         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1180         completeOperationLocal(new TransactionProxyOperation() {
1181             @Override
1182             public void run(TransactionProxy transactionProxy) {
1183                 transactionProxy.read(TestModel.TEST_PATH);
1184
1185                 transactionProxy.read(TestModel.TEST_PATH);
1186             }
1187         }, createDataTree(nodeToRead));
1188
1189     }
1190
1191     @Test
1192     public void testReadCompletionForLocalShardWhenExceptionOccurs(){
1193         completeOperationLocal(new TransactionProxyOperation() {
1194             @Override
1195             public void run(TransactionProxy transactionProxy) {
1196                 transactionProxy.read(TestModel.TEST_PATH);
1197
1198                 transactionProxy.read(TestModel.TEST_PATH);
1199             }
1200         }, createDataTree());
1201
1202     }
1203
1204     @Test
1205     public void testExistsThrottlingWhenShardFound(){
1206
1207         throttleOperation(new TransactionProxyOperation() {
1208             @Override
1209             public void run(TransactionProxy transactionProxy) {
1210                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1211                         any(ActorSelection.class), eqDataExists());
1212
1213                 transactionProxy.exists(TestModel.TEST_PATH);
1214
1215                 transactionProxy.exists(TestModel.TEST_PATH);
1216             }
1217         });
1218     }
1219
1220     @Test
1221     public void testExistsThrottlingWhenShardNotFound(){
1222
1223         completeOperation(new TransactionProxyOperation() {
1224             @Override
1225             public void run(TransactionProxy transactionProxy) {
1226                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1227                         any(ActorSelection.class), eqDataExists());
1228
1229                 transactionProxy.exists(TestModel.TEST_PATH);
1230
1231                 transactionProxy.exists(TestModel.TEST_PATH);
1232             }
1233         }, false);
1234     }
1235
1236
1237     @Test
1238     public void testExistsCompletion(){
1239         completeOperation(new TransactionProxyOperation() {
1240             @Override
1241             public void run(TransactionProxy transactionProxy) {
1242                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1243                         any(ActorSelection.class), eqDataExists());
1244
1245                 transactionProxy.exists(TestModel.TEST_PATH);
1246
1247                 transactionProxy.exists(TestModel.TEST_PATH);
1248             }
1249         });
1250
1251     }
1252
1253     @Test
1254     public void testExistsCompletionForLocalShard(){
1255         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1256         completeOperationLocal(new TransactionProxyOperation() {
1257             @Override
1258             public void run(TransactionProxy transactionProxy) {
1259                 transactionProxy.exists(TestModel.TEST_PATH);
1260
1261                 transactionProxy.exists(TestModel.TEST_PATH);
1262             }
1263         }, createDataTree(nodeToRead));
1264
1265     }
1266
1267     @Test
1268     public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
1269         completeOperationLocal(new TransactionProxyOperation() {
1270             @Override
1271             public void run(TransactionProxy transactionProxy) {
1272                 transactionProxy.exists(TestModel.TEST_PATH);
1273
1274                 transactionProxy.exists(TestModel.TEST_PATH);
1275             }
1276         }, createDataTree());
1277
1278     }
1279     @Test
1280     public void testReadyThrottling(){
1281
1282         throttleOperation(new TransactionProxyOperation() {
1283             @Override
1284             public void run(TransactionProxy transactionProxy) {
1285                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1286
1287                 expectBatchedModifications(1);
1288
1289                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1290
1291                 transactionProxy.ready();
1292             }
1293         });
1294     }
1295
1296     @Test
1297     public void testReadyThrottlingWithTwoTransactionContexts(){
1298         throttleOperation(new TransactionProxyOperation() {
1299             @Override
1300             public void run(TransactionProxy transactionProxy) {
1301                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1302                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1303
1304                 expectBatchedModifications(2);
1305
1306                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1307
1308                 // Trying to write to Cars will cause another transaction context to get created
1309                 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1310
1311                 // Now ready should block for both transaction contexts
1312                 transactionProxy.ready();
1313             }
1314         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
1315     }
1316
1317     private void testModificationOperationBatching(TransactionType type) throws Exception {
1318         int shardBatchedModificationCount = 3;
1319         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1320
1321         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1322
1323         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1324
1325         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1326         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1327
1328         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1329         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1330
1331         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1332         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1333
1334         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1335         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1336
1337         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1338         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1339
1340         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1341         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1342
1343         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1344         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1345
1346         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1347
1348         transactionProxy.write(writePath1, writeNode1);
1349         transactionProxy.write(writePath2, writeNode2);
1350         transactionProxy.delete(deletePath1);
1351         transactionProxy.merge(mergePath1, mergeNode1);
1352         transactionProxy.merge(mergePath2, mergeNode2);
1353         transactionProxy.write(writePath3, writeNode3);
1354         transactionProxy.merge(mergePath3, mergeNode3);
1355         transactionProxy.delete(deletePath2);
1356
1357         // This sends the last batch.
1358         transactionProxy.ready();
1359
1360         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1361         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1362
1363         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1364                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1365
1366         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1367                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1368
1369         verifyBatchedModifications(batchedModifications.get(2), true, true,
1370                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1371
1372         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1373     }
1374
1375     @Test
1376     public void testReadWriteModificationOperationBatching() throws Throwable {
1377         testModificationOperationBatching(READ_WRITE);
1378     }
1379
1380     @Test
1381     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1382         testModificationOperationBatching(WRITE_ONLY);
1383     }
1384
1385     @Test
1386     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1387         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1388         testModificationOperationBatching(WRITE_ONLY);
1389     }
1390
1391     @Test
1392     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1393
1394         int shardBatchedModificationCount = 10;
1395         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1396
1397         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1398
1399         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1400
1401         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1402         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1403
1404         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1405         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1406
1407         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1408         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1409
1410         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1411         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1412
1413         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1414
1415         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1416                 eq(actorSelection(actorRef)), eqReadData(writePath2));
1417
1418         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1419                 eq(actorSelection(actorRef)), eqReadData(mergePath2));
1420
1421         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1422                 eq(actorSelection(actorRef)), eqDataExists());
1423
1424         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1425
1426         transactionProxy.write(writePath1, writeNode1);
1427         transactionProxy.write(writePath2, writeNode2);
1428
1429         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1430                 get(5, TimeUnit.SECONDS);
1431
1432         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1433         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1434
1435         transactionProxy.merge(mergePath1, mergeNode1);
1436         transactionProxy.merge(mergePath2, mergeNode2);
1437
1438         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1439
1440         transactionProxy.delete(deletePath);
1441
1442         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1443         assertEquals("Exists response", true, exists);
1444
1445         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1446         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1447
1448         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1449         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1450
1451         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1452                 new WriteModification(writePath2, writeNode2));
1453
1454         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1455                 new MergeModification(mergePath2, mergeNode2));
1456
1457         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1458
1459         InOrder inOrder = Mockito.inOrder(mockActorContext);
1460         inOrder.verify(mockActorContext).executeOperationAsync(
1461                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1462
1463         inOrder.verify(mockActorContext).executeOperationAsync(
1464                 eq(actorSelection(actorRef)), eqReadData(writePath2));
1465
1466         inOrder.verify(mockActorContext).executeOperationAsync(
1467                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1468
1469         inOrder.verify(mockActorContext).executeOperationAsync(
1470                 eq(actorSelection(actorRef)), eqReadData(mergePath2));
1471
1472         inOrder.verify(mockActorContext).executeOperationAsync(
1473                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1474
1475         inOrder.verify(mockActorContext).executeOperationAsync(
1476                 eq(actorSelection(actorRef)), eqDataExists());
1477     }
1478
1479     @Test
1480     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1481
1482         SchemaContext schemaContext = SchemaContextHelper.full();
1483         Configuration configuration = mock(Configuration.class);
1484         doReturn(configuration).when(mockActorContext).getConfiguration();
1485         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1486         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1487
1488         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1489         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1490
1491         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1492         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1493
1494         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1495
1496         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1497
1498         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1499
1500         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1501                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1502
1503         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1504
1505         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1506
1507         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1508
1509         @SuppressWarnings("unchecked")
1510         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1511
1512         for(NormalizedNode<?,?> node : collection){
1513             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1514         }
1515
1516         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1517                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1518
1519         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1520
1521         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1522                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1523
1524         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1525     }
1526
1527
1528     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1529         ActorSystem actorSystem = getSystem();
1530         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1531
1532         doReturn(getSystem().actorSelection(shardActorRef.path())).
1533                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1534
1535         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1536                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1537
1538         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1539
1540         doReturn(actorSystem.actorSelection(txActorRef.path())).
1541                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1542
1543         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1544                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1545                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1546
1547         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1548                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()));
1549     }
1550 }