31cde96db6b515c518e24c06fb8f21b6bbbaac0b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.ActorSystem;
28 import akka.actor.Props;
29 import akka.dispatch.Futures;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Sets;
33 import com.google.common.util.concurrent.CheckedFuture;
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
51 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
52 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
57 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
58 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
59 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
60 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
61 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
62 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
63 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
64 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
65 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
73 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Promise;
76
77 @SuppressWarnings("resource")
78 public class TransactionProxyTest extends AbstractTransactionProxyTest {
79
80     @SuppressWarnings("serial")
81     static class TestException extends RuntimeException {
82     }
83
84     static interface Invoker {
85         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
86     }
87
88     @Test
89     public void testRead() throws Exception {
90         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
91
92         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
93
94         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
95                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
96
97         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
98                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
99
100         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
101
102         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
103
104         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
105                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
106
107         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108
109         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
110
111         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
112     }
113
114     @Test(expected = ReadFailedException.class)
115     public void testReadWithInvalidReplyMessageType() throws Exception {
116         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
117
118         doReturn(Futures.successful(new Object())).when(mockActorContext).
119                 executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
120
121         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
122
123         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
124     }
125
126     @Test(expected = TestException.class)
127     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
128         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
129
130         doReturn(Futures.failed(new TestException())).when(mockActorContext).
131                 executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
132
133         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
134
135         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
136     }
137
138     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
139             throws Throwable {
140         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
141
142         if (exToThrow instanceof PrimaryNotFoundException) {
143             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
144         } else {
145             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
146                     when(mockActorContext).findPrimaryShardAsync(anyString());
147         }
148
149         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
150                 any(ActorSelection.class), any(), any(Timeout.class));
151
152         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
153
154         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
155     }
156
157     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
158         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
159             @Override
160             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
161                 return proxy.read(TestModel.TEST_PATH);
162             }
163         });
164     }
165
166     @Test(expected = PrimaryNotFoundException.class)
167     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
168         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
169     }
170
171     @Test(expected = TimeoutException.class)
172     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
173         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
174                 new Exception("reason")));
175     }
176
177     @Test(expected = TestException.class)
178     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
179         testReadWithExceptionOnInitialCreateTransaction(new TestException());
180     }
181
182     @Test
183     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
184         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
185
186         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
187
188         expectBatchedModifications(actorRef, 1);
189
190         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
191                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
192
193         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
194
195         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
196
197         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
198                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
199
200         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
201         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
202
203         InOrder inOrder = Mockito.inOrder(mockActorContext);
204         inOrder.verify(mockActorContext).executeOperationAsync(
205                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
206
207         inOrder.verify(mockActorContext).executeOperationAsync(
208                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
209     }
210
211     @Test(expected=IllegalStateException.class)
212     public void testReadPreConditionCheck() {
213         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
214         transactionProxy.read(TestModel.TEST_PATH);
215     }
216
217     @Test(expected=IllegalArgumentException.class)
218     public void testInvalidCreateTransactionReply() throws Throwable {
219         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
220
221         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
222             actorSelection(actorRef.path().toString());
223
224         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
225             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
226
227         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
228             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
229             any(Timeout.class));
230
231         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
232
233         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
234     }
235
236     @Test
237     public void testExists() throws Exception {
238         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
239
240         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
241
242         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
243                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
244
245         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
246
247         assertEquals("Exists response", false, exists);
248
249         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
250                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
251
252         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
253
254         assertEquals("Exists response", true, exists);
255     }
256
257     @Test(expected = PrimaryNotFoundException.class)
258     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
259         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
260             @Override
261             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
262                 return proxy.exists(TestModel.TEST_PATH);
263             }
264         });
265     }
266
267     @Test(expected = ReadFailedException.class)
268     public void testExistsWithInvalidReplyMessageType() throws Exception {
269         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
270
271         doReturn(Futures.successful(new Object())).when(mockActorContext).
272                 executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
273
274         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
275
276         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
277     }
278
279     @Test(expected = TestException.class)
280     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
281         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
282
283         doReturn(Futures.failed(new TestException())).when(mockActorContext).
284                 executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
285
286         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
287
288         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
289     }
290
291     @Test
292     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
293         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
294
295         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296
297         expectBatchedModifications(actorRef, 1);
298
299         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
300                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
301
302         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
303
304         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
305
306         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
307
308         assertEquals("Exists response", true, exists);
309
310         InOrder inOrder = Mockito.inOrder(mockActorContext);
311         inOrder.verify(mockActorContext).executeOperationAsync(
312                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
313
314         inOrder.verify(mockActorContext).executeOperationAsync(
315                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
316     }
317
318     @Test(expected=IllegalStateException.class)
319     public void testExistsPreConditionCheck() {
320         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
321         transactionProxy.exists(TestModel.TEST_PATH);
322     }
323
324     @Test
325     public void testWrite() throws Exception {
326         dataStoreContextBuilder.shardBatchedModificationCount(1);
327         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
328
329         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
330
331         expectBatchedModifications(actorRef, 1);
332
333         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
334
335         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
336
337         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
338     }
339
340     @Test
341     public void testWriteAfterAsyncRead() throws Throwable {
342         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
343
344         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
345         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
346                 eq(getSystem().actorSelection(actorRef.path())),
347                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
348
349         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
350                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
351
352         expectBatchedModificationsReady(actorRef);
353
354         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
355
356         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
357
358         final CountDownLatch readComplete = new CountDownLatch(1);
359         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
360         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
361                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
362                     @Override
363                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
364                         try {
365                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
366                         } catch (Exception e) {
367                             caughtEx.set(e);
368                         } finally {
369                             readComplete.countDown();
370                         }
371                     }
372
373                     @Override
374                     public void onFailure(Throwable t) {
375                         caughtEx.set(t);
376                         readComplete.countDown();
377                     }
378                 });
379
380         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
381
382         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
383
384         if(caughtEx.get() != null) {
385             throw caughtEx.get();
386         }
387
388         // This sends the batched modification.
389         transactionProxy.ready();
390
391         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
392     }
393
394     @Test(expected=IllegalStateException.class)
395     public void testWritePreConditionCheck() {
396         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
397         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
398     }
399
400     @Test(expected=IllegalStateException.class)
401     public void testWriteAfterReadyPreConditionCheck() {
402         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
403
404         transactionProxy.ready();
405
406         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
407     }
408
409     @Test
410     public void testMerge() throws Exception {
411         dataStoreContextBuilder.shardBatchedModificationCount(1);
412         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
413
414         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
415
416         expectBatchedModifications(actorRef, 1);
417
418         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
419
420         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
421
422         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
423     }
424
425     @Test
426     public void testDelete() throws Exception {
427         dataStoreContextBuilder.shardBatchedModificationCount(1);
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
429
430         expectBatchedModifications(actorRef, 1);
431
432         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
433
434         transactionProxy.delete(TestModel.TEST_PATH);
435
436         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
437     }
438
439     @Test
440     public void testReadWrite() throws Exception {
441         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
442
443         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
444
445         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
446                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
447
448         expectBatchedModifications(actorRef, 1);
449
450         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
451
452         transactionProxy.read(TestModel.TEST_PATH);
453
454         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
455
456         transactionProxy.read(TestModel.TEST_PATH);
457
458         transactionProxy.read(TestModel.TEST_PATH);
459
460         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
461         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
462
463         verifyBatchedModifications(batchedModifications.get(0), false,
464                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
465     }
466
467     @Test
468     public void testReadyWithReadWrite() throws Exception {
469         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
470
471         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472
473         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
474                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
475
476         expectBatchedModificationsReady(actorRef, true);
477
478         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
479
480         transactionProxy.read(TestModel.TEST_PATH);
481
482         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
483
484         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
485
486         assertTrue(ready instanceof SingleCommitCohortProxy);
487
488         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
489
490         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
491         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
492
493         verifyBatchedModifications(batchedModifications.get(0), true, true,
494                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
495
496         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
497     }
498
499     @Test
500     public void testReadyWithNoModifications() throws Exception {
501         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
502
503         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
504                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
505
506         expectBatchedModificationsReady(actorRef, true);
507
508         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
509
510         transactionProxy.read(TestModel.TEST_PATH);
511
512         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
513
514         assertTrue(ready instanceof SingleCommitCohortProxy);
515
516         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
517
518         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
519         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
520
521         verifyBatchedModifications(batchedModifications.get(0), true, true);
522     }
523
524     @Test
525     public void testReadyWithMultipleShardWrites() throws Exception {
526         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
527
528         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
529
530         expectBatchedModificationsReady(actorRef1);
531         expectBatchedModificationsReady(actorRef2);
532
533         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
534
535         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
536         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
537
538         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
539
540         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
541
542         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
543                 actorSelection(actorRef2));
544     }
545
546     @Test
547     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
548         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
549
550         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
551
552         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
553
554         expectBatchedModificationsReady(actorRef, true);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559
560         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
561
562         assertTrue(ready instanceof SingleCommitCohortProxy);
563
564         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
565
566         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
567         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
568
569         verifyBatchedModifications(batchedModifications.get(0), true, true,
570                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
571     }
572
573     @Test
574     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
575         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
576         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
577
578         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
579
580         expectBatchedModificationsReady(actorRef, true);
581
582         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
583
584         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
585
586         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
587
588         assertTrue(ready instanceof SingleCommitCohortProxy);
589
590         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
591
592         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
593         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
594
595         verifyBatchedModifications(batchedModifications.get(0), false,
596                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
597
598         verifyBatchedModifications(batchedModifications.get(1), true, true);
599     }
600
601     @Test
602     public void testReadyWithReplyFailure() throws Exception {
603         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
604
605         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
606
607         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
608
609         expectFailedBatchedModifications(actorRef);
610
611         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
612
613         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
614
615         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
616
617         assertTrue(ready instanceof SingleCommitCohortProxy);
618
619         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
620     }
621
622     @Test
623     public void testReadyWithDebugContextEnabled() throws Exception {
624         dataStoreContextBuilder.transactionDebugContextEnabled(true);
625
626         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
627
628         expectBatchedModificationsReady(actorRef, true);
629
630         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
631
632         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
633
634         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
635
636         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
637
638         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
639     }
640
641     @Test
642     public void testReadyWithLocalTransaction() throws Exception {
643         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
644
645         doReturn(getSystem().actorSelection(shardActorRef.path())).
646                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
647
648         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).
649                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
650
651         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
652
653         expectReadyLocalTransaction(shardActorRef, true);
654
655         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
656         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
657
658         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
659         assertTrue(ready instanceof SingleCommitCohortProxy);
660         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
661     }
662
663     @Test
664     public void testReadyWithLocalTransactionWithFailure() throws Exception {
665         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
666
667         doReturn(getSystem().actorSelection(shardActorRef.path())).
668                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
669
670         Optional<DataTree> mockDataTree = createDataTree();
671         DataTreeModification mockModification = mockDataTree.get().takeSnapshot().newModification();
672         doThrow(new RuntimeException("mock")).when(mockModification).ready();
673
674         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).
675                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
676
677         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
678
679         expectReadyLocalTransaction(shardActorRef, true);
680
681         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
682         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
683
684         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
685         assertTrue(ready instanceof SingleCommitCohortProxy);
686         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
687     }
688
689     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
690         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
691
692         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
693
694         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
695
696         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
697
698         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
699
700         transactionProxy.delete(TestModel.TEST_PATH);
701
702         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
703
704         assertTrue(ready instanceof SingleCommitCohortProxy);
705
706         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
707     }
708
709     @Test
710     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
711         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
712     }
713
714     @Test
715     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
716         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
717     }
718
719     @Test
720     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
721         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
722     }
723
724     @Test
725     public void testReadyWithInvalidReplyMessageType() throws Exception {
726         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
727         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
728
729         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
730
731         doReturn(Futures.successful(new Object())).when(mockActorContext).
732                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class),
733                         any(Timeout.class));
734
735         expectBatchedModificationsReady(actorRef2);
736
737         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
738
739         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
740         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
741
742         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
743
744         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
745
746         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
747                 IllegalArgumentException.class);
748     }
749
750     @Test
751     public void testGetIdentifier() {
752         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
753         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
754
755         Object id = transactionProxy.getIdentifier();
756         assertNotNull("getIdentifier returned null", id);
757         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
758     }
759
760     @Test
761     public void testClose() throws Exception{
762         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
763
764         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
765                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
766
767         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
768
769         transactionProxy.read(TestModel.TEST_PATH);
770
771         transactionProxy.close();
772
773         verify(mockActorContext).sendOperationAsync(
774                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
775     }
776
777     private static interface TransactionProxyOperation {
778         void run(TransactionProxy transactionProxy);
779     }
780
781     private void throttleOperation(TransactionProxyOperation operation) {
782         throttleOperation(operation, 1, true);
783     }
784
785     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
786         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
787                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
788     }
789
790     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
791         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
792                 Optional.<DataTree>absent());
793     }
794
795     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
796         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
797                 dataTreeOptional);
798     }
799
800
801     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
802         ActorSystem actorSystem = getSystem();
803         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
804
805         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
806         // we now allow one extra permit to be allowed for ready
807         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
808                 shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
809
810         doReturn(actorSystem.actorSelection(shardActorRef.path())).
811                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
812
813         if(shardFound) {
814             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
815                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
816             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
817                     when(mockActorContext).findPrimaryShardAsync(eq("cars"));
818
819         } else {
820             doReturn(Futures.failed(new Exception("not found")))
821                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
822         }
823
824         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
825
826         doReturn(incompleteFuture()).when(mockActorContext).
827         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
828                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
829
830         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
831
832         long start = System.nanoTime();
833
834         operation.run(transactionProxy);
835
836         long end = System.nanoTime();
837
838         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
839                 expectedCompletionTime, (end-start)),
840                 ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
841
842     }
843
844     private void completeOperation(TransactionProxyOperation operation){
845         completeOperation(operation, true);
846     }
847
848     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
849         ActorSystem actorSystem = getSystem();
850         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
851
852         doReturn(actorSystem.actorSelection(shardActorRef.path())).
853                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
854
855         if(shardFound) {
856             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
857                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
858         } else {
859             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
860                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
861         }
862
863         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
864         String actorPath = txActorRef.path().toString();
865         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, "txn-1",
866                 DataStoreVersions.CURRENT_VERSION);
867
868         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
869
870         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
871                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
872                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
873
874         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
875
876         long start = System.nanoTime();
877
878         operation.run(transactionProxy);
879
880         long end = System.nanoTime();
881
882         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
883         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
884                 expected, (end-start)), (end - start) <= expected);
885     }
886
887     private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
888         ActorSystem actorSystem = getSystem();
889         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
890
891         doReturn(actorSystem.actorSelection(shardActorRef.path())).
892                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
893
894         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
895                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
896
897         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
898
899         long start = System.nanoTime();
900
901         operation.run(transactionProxy);
902
903         long end = System.nanoTime();
904
905         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
906         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
907                 expected, (end-start)), (end - start) <= expected);
908     }
909
910     private static Optional<DataTree> createDataTree(){
911         DataTree dataTree = mock(DataTree.class);
912         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
913         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
914         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
915
916         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
917         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
918
919         return dataTreeOptional;
920     }
921
922     private static Optional<DataTree> createDataTree(NormalizedNode<?, ?> readResponse){
923         DataTree dataTree = mock(DataTree.class);
924         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
925         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
926         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
927
928         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
929         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
930         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
931
932         return dataTreeOptional;
933     }
934
935
936     @Test
937     public void testWriteCompletionForLocalShard(){
938         completeOperationLocal(new TransactionProxyOperation() {
939             @Override
940             public void run(TransactionProxy transactionProxy) {
941                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
942
943                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
944
945                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
946
947             }
948         }, createDataTree());
949     }
950
951     @Test
952     public void testWriteThrottlingWhenShardFound(){
953         throttleOperation(new TransactionProxyOperation() {
954             @Override
955             public void run(TransactionProxy transactionProxy) {
956                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
957
958                 expectIncompleteBatchedModifications();
959
960                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
961
962                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
963             }
964         });
965     }
966
967     @Test
968     public void testWriteThrottlingWhenShardNotFound(){
969         // Confirm that there is no throttling when the Shard is not found
970         completeOperation(new TransactionProxyOperation() {
971             @Override
972             public void run(TransactionProxy transactionProxy) {
973                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
974
975                 expectBatchedModifications(2);
976
977                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
978
979                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
980             }
981         }, false);
982
983     }
984
985
986     @Test
987     public void testWriteCompletion(){
988         completeOperation(new TransactionProxyOperation() {
989             @Override
990             public void run(TransactionProxy transactionProxy) {
991                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
992
993                 expectBatchedModifications(2);
994
995                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
996
997                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
998             }
999         });
1000     }
1001
1002     @Test
1003     public void testMergeThrottlingWhenShardFound(){
1004         throttleOperation(new TransactionProxyOperation() {
1005             @Override
1006             public void run(TransactionProxy transactionProxy) {
1007                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1008
1009                 expectIncompleteBatchedModifications();
1010
1011                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1012
1013                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1014             }
1015         });
1016     }
1017
1018     @Test
1019     public void testMergeThrottlingWhenShardNotFound(){
1020         completeOperation(new TransactionProxyOperation() {
1021             @Override
1022             public void run(TransactionProxy transactionProxy) {
1023                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1024
1025                 expectBatchedModifications(2);
1026
1027                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1028
1029                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1030             }
1031         }, false);
1032     }
1033
1034     @Test
1035     public void testMergeCompletion(){
1036         completeOperation(new TransactionProxyOperation() {
1037             @Override
1038             public void run(TransactionProxy transactionProxy) {
1039                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1040
1041                 expectBatchedModifications(2);
1042
1043                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1044
1045                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1046             }
1047         });
1048
1049     }
1050
1051     @Test
1052     public void testMergeCompletionForLocalShard(){
1053         completeOperationLocal(new TransactionProxyOperation() {
1054             @Override
1055             public void run(TransactionProxy transactionProxy) {
1056                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1057
1058                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1059
1060                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1061
1062             }
1063         }, createDataTree());
1064     }
1065
1066
1067     @Test
1068     public void testDeleteThrottlingWhenShardFound(){
1069
1070         throttleOperation(new TransactionProxyOperation() {
1071             @Override
1072             public void run(TransactionProxy transactionProxy) {
1073                 expectIncompleteBatchedModifications();
1074
1075                 transactionProxy.delete(TestModel.TEST_PATH);
1076
1077                 transactionProxy.delete(TestModel.TEST_PATH);
1078             }
1079         });
1080     }
1081
1082
1083     @Test
1084     public void testDeleteThrottlingWhenShardNotFound(){
1085
1086         completeOperation(new TransactionProxyOperation() {
1087             @Override
1088             public void run(TransactionProxy transactionProxy) {
1089                 expectBatchedModifications(2);
1090
1091                 transactionProxy.delete(TestModel.TEST_PATH);
1092
1093                 transactionProxy.delete(TestModel.TEST_PATH);
1094             }
1095         }, false);
1096     }
1097
1098     @Test
1099     public void testDeleteCompletionForLocalShard(){
1100         completeOperationLocal(new TransactionProxyOperation() {
1101             @Override
1102             public void run(TransactionProxy transactionProxy) {
1103
1104                 transactionProxy.delete(TestModel.TEST_PATH);
1105
1106                 transactionProxy.delete(TestModel.TEST_PATH);
1107             }
1108         }, createDataTree());
1109
1110     }
1111
1112     @Test
1113     public void testDeleteCompletion(){
1114         completeOperation(new TransactionProxyOperation() {
1115             @Override
1116             public void run(TransactionProxy transactionProxy) {
1117                 expectBatchedModifications(2);
1118
1119                 transactionProxy.delete(TestModel.TEST_PATH);
1120
1121                 transactionProxy.delete(TestModel.TEST_PATH);
1122             }
1123         });
1124
1125     }
1126
1127     @Test
1128     public void testReadThrottlingWhenShardFound(){
1129
1130         throttleOperation(new TransactionProxyOperation() {
1131             @Override
1132             public void run(TransactionProxy transactionProxy) {
1133                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1134                         any(ActorSelection.class), eqReadData());
1135
1136                 transactionProxy.read(TestModel.TEST_PATH);
1137
1138                 transactionProxy.read(TestModel.TEST_PATH);
1139             }
1140         });
1141     }
1142
1143     @Test
1144     public void testReadThrottlingWhenShardNotFound(){
1145
1146         completeOperation(new TransactionProxyOperation() {
1147             @Override
1148             public void run(TransactionProxy transactionProxy) {
1149                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1150                         any(ActorSelection.class), eqReadData());
1151
1152                 transactionProxy.read(TestModel.TEST_PATH);
1153
1154                 transactionProxy.read(TestModel.TEST_PATH);
1155             }
1156         }, false);
1157     }
1158
1159
1160     @Test
1161     public void testReadCompletion(){
1162         completeOperation(new TransactionProxyOperation() {
1163             @Override
1164             public void run(TransactionProxy transactionProxy) {
1165                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1166
1167                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1168                         any(ActorSelection.class), eqReadData(), any(Timeout.class));
1169
1170                 transactionProxy.read(TestModel.TEST_PATH);
1171
1172                 transactionProxy.read(TestModel.TEST_PATH);
1173             }
1174         });
1175
1176     }
1177
1178     @Test
1179     public void testReadCompletionForLocalShard(){
1180         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1181         completeOperationLocal(new TransactionProxyOperation() {
1182             @Override
1183             public void run(TransactionProxy transactionProxy) {
1184                 transactionProxy.read(TestModel.TEST_PATH);
1185
1186                 transactionProxy.read(TestModel.TEST_PATH);
1187             }
1188         }, createDataTree(nodeToRead));
1189
1190     }
1191
1192     @Test
1193     public void testReadCompletionForLocalShardWhenExceptionOccurs(){
1194         completeOperationLocal(new TransactionProxyOperation() {
1195             @Override
1196             public void run(TransactionProxy transactionProxy) {
1197                 transactionProxy.read(TestModel.TEST_PATH);
1198
1199                 transactionProxy.read(TestModel.TEST_PATH);
1200             }
1201         }, createDataTree());
1202
1203     }
1204
1205     @Test
1206     public void testExistsThrottlingWhenShardFound(){
1207
1208         throttleOperation(new TransactionProxyOperation() {
1209             @Override
1210             public void run(TransactionProxy transactionProxy) {
1211                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1212                         any(ActorSelection.class), eqDataExists());
1213
1214                 transactionProxy.exists(TestModel.TEST_PATH);
1215
1216                 transactionProxy.exists(TestModel.TEST_PATH);
1217             }
1218         });
1219     }
1220
1221     @Test
1222     public void testExistsThrottlingWhenShardNotFound(){
1223
1224         completeOperation(new TransactionProxyOperation() {
1225             @Override
1226             public void run(TransactionProxy transactionProxy) {
1227                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1228                         any(ActorSelection.class), eqDataExists());
1229
1230                 transactionProxy.exists(TestModel.TEST_PATH);
1231
1232                 transactionProxy.exists(TestModel.TEST_PATH);
1233             }
1234         }, false);
1235     }
1236
1237
1238     @Test
1239     public void testExistsCompletion(){
1240         completeOperation(new TransactionProxyOperation() {
1241             @Override
1242             public void run(TransactionProxy transactionProxy) {
1243                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1244                         any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1245
1246                 transactionProxy.exists(TestModel.TEST_PATH);
1247
1248                 transactionProxy.exists(TestModel.TEST_PATH);
1249             }
1250         });
1251
1252     }
1253
1254     @Test
1255     public void testExistsCompletionForLocalShard(){
1256         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1257         completeOperationLocal(new TransactionProxyOperation() {
1258             @Override
1259             public void run(TransactionProxy transactionProxy) {
1260                 transactionProxy.exists(TestModel.TEST_PATH);
1261
1262                 transactionProxy.exists(TestModel.TEST_PATH);
1263             }
1264         }, createDataTree(nodeToRead));
1265
1266     }
1267
1268     @Test
1269     public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
1270         completeOperationLocal(new TransactionProxyOperation() {
1271             @Override
1272             public void run(TransactionProxy transactionProxy) {
1273                 transactionProxy.exists(TestModel.TEST_PATH);
1274
1275                 transactionProxy.exists(TestModel.TEST_PATH);
1276             }
1277         }, createDataTree());
1278
1279     }
1280     @Test
1281     public void testReadyThrottling(){
1282
1283         throttleOperation(new TransactionProxyOperation() {
1284             @Override
1285             public void run(TransactionProxy transactionProxy) {
1286                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1287
1288                 expectBatchedModifications(1);
1289
1290                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1291
1292                 transactionProxy.ready();
1293             }
1294         });
1295     }
1296
1297     @Test
1298     public void testReadyThrottlingWithTwoTransactionContexts(){
1299         throttleOperation(new TransactionProxyOperation() {
1300             @Override
1301             public void run(TransactionProxy transactionProxy) {
1302                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1303                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1304
1305                 expectBatchedModifications(2);
1306
1307                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1308
1309                 // Trying to write to Cars will cause another transaction context to get created
1310                 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1311
1312                 // Now ready should block for both transaction contexts
1313                 transactionProxy.ready();
1314             }
1315         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
1316     }
1317
1318     private void testModificationOperationBatching(TransactionType type) throws Exception {
1319         int shardBatchedModificationCount = 3;
1320         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1321
1322         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1323
1324         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1325
1326         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1327         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1328
1329         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1330         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1331
1332         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1333         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1334
1335         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1336         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1337
1338         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1339         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1340
1341         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1342         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1343
1344         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1345         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1346
1347         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1348
1349         transactionProxy.write(writePath1, writeNode1);
1350         transactionProxy.write(writePath2, writeNode2);
1351         transactionProxy.delete(deletePath1);
1352         transactionProxy.merge(mergePath1, mergeNode1);
1353         transactionProxy.merge(mergePath2, mergeNode2);
1354         transactionProxy.write(writePath3, writeNode3);
1355         transactionProxy.merge(mergePath3, mergeNode3);
1356         transactionProxy.delete(deletePath2);
1357
1358         // This sends the last batch.
1359         transactionProxy.ready();
1360
1361         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1362         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1363
1364         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1365                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1366
1367         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1368                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1369
1370         verifyBatchedModifications(batchedModifications.get(2), true, true,
1371                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1372
1373         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1374     }
1375
1376     @Test
1377     public void testReadWriteModificationOperationBatching() throws Throwable {
1378         testModificationOperationBatching(READ_WRITE);
1379     }
1380
1381     @Test
1382     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1383         testModificationOperationBatching(WRITE_ONLY);
1384     }
1385
1386     @Test
1387     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1388         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1389         testModificationOperationBatching(WRITE_ONLY);
1390     }
1391
1392     @Test
1393     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1394
1395         int shardBatchedModificationCount = 10;
1396         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1397
1398         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1399
1400         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1401
1402         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1403         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1404
1405         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1406         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1407
1408         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1409         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1410
1411         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1412         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1413
1414         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1415
1416         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1417                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1418
1419         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1420                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1421
1422         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1423                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1424
1425         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1426
1427         transactionProxy.write(writePath1, writeNode1);
1428         transactionProxy.write(writePath2, writeNode2);
1429
1430         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1431                 get(5, TimeUnit.SECONDS);
1432
1433         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1434         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1435
1436         transactionProxy.merge(mergePath1, mergeNode1);
1437         transactionProxy.merge(mergePath2, mergeNode2);
1438
1439         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1440
1441         transactionProxy.delete(deletePath);
1442
1443         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1444         assertEquals("Exists response", true, exists);
1445
1446         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1447         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1448
1449         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1450         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1451
1452         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1453                 new WriteModification(writePath2, writeNode2));
1454
1455         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1456                 new MergeModification(mergePath2, mergeNode2));
1457
1458         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1459
1460         InOrder inOrder = Mockito.inOrder(mockActorContext);
1461         inOrder.verify(mockActorContext).executeOperationAsync(
1462                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1463
1464         inOrder.verify(mockActorContext).executeOperationAsync(
1465                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1466
1467         inOrder.verify(mockActorContext).executeOperationAsync(
1468                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1469
1470         inOrder.verify(mockActorContext).executeOperationAsync(
1471                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1472
1473         inOrder.verify(mockActorContext).executeOperationAsync(
1474                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1475
1476         inOrder.verify(mockActorContext).executeOperationAsync(
1477                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1478     }
1479
1480     @Test
1481     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1482
1483         SchemaContext schemaContext = SchemaContextHelper.full();
1484         Configuration configuration = mock(Configuration.class);
1485         doReturn(configuration).when(mockActorContext).getConfiguration();
1486         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1487         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1488
1489         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1490         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1491
1492         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1493         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1494
1495         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1496
1497         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1498
1499         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1500
1501         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1502                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1503
1504         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1505
1506         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1507
1508         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1509
1510         @SuppressWarnings("unchecked")
1511         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1512
1513         for(NormalizedNode<?,?> node : collection){
1514             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1515         }
1516
1517         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1518                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1519
1520         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1521
1522         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1523                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1524
1525         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1526     }
1527
1528
1529     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1530         ActorSystem actorSystem = getSystem();
1531         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1532
1533         doReturn(getSystem().actorSelection(shardActorRef.path())).
1534                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1535
1536         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1537                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1538
1539         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1540
1541         doReturn(actorSystem.actorSelection(txActorRef.path())).
1542                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1543
1544         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1545                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1546                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1547
1548         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1549                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()), any(Timeout.class));
1550     }
1551 }