BUG-5280: switch transaction IDs from String to TransactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.ActorSystem;
28 import akka.actor.Props;
29 import akka.dispatch.Futures;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Sets;
33 import com.google.common.util.concurrent.CheckedFuture;
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.access.concepts.MemberName;
47 import org.opendaylight.controller.cluster.datastore.config.Configuration;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
52 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
53 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
57 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
58 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
59 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
60 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
61 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
62 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
63 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
64 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
67 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
74 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
75 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
76 import scala.concurrent.Promise;
77
78 @SuppressWarnings("resource")
79 public class TransactionProxyTest extends AbstractTransactionProxyTest {
80
81     @SuppressWarnings("serial")
82     static class TestException extends RuntimeException {
83     }
84
85     static interface Invoker {
86         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
87     }
88
89     @Test
90     public void testRead() throws Exception {
91         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
92
93         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
94
95         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
96                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
97
98         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
99                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
100
101         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
102
103         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
104
105         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
106                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
107
108         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
109
110         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
111
112         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
113     }
114
115     @Test(expected = ReadFailedException.class)
116     public void testReadWithInvalidReplyMessageType() throws Exception {
117         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
118
119         doReturn(Futures.successful(new Object())).when(mockActorContext).
120                 executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
121
122         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
123
124         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
125     }
126
127     @Test(expected = TestException.class)
128     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
129         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
130
131         doReturn(Futures.failed(new TestException())).when(mockActorContext).
132                 executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
133
134         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
135
136         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
137     }
138
139     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
140             throws Throwable {
141         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
142
143         if (exToThrow instanceof PrimaryNotFoundException) {
144             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
145         } else {
146             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
147                     when(mockActorContext).findPrimaryShardAsync(anyString());
148         }
149
150         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
151                 any(ActorSelection.class), any(), any(Timeout.class));
152
153         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
154
155         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
156     }
157
158     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
159         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
160             @Override
161             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
162                 return proxy.read(TestModel.TEST_PATH);
163             }
164         });
165     }
166
167     @Test(expected = PrimaryNotFoundException.class)
168     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
169         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
170     }
171
172     @Test(expected = TimeoutException.class)
173     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
174         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
175                 new Exception("reason")));
176     }
177
178     @Test(expected = TestException.class)
179     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
180         testReadWithExceptionOnInitialCreateTransaction(new TestException());
181     }
182
183     @Test
184     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
185         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
186
187         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
188
189         expectBatchedModifications(actorRef, 1);
190
191         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
192                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
193
194         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
195
196         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
197
198         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
199                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
200
201         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
202         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
203
204         InOrder inOrder = Mockito.inOrder(mockActorContext);
205         inOrder.verify(mockActorContext).executeOperationAsync(
206                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
207
208         inOrder.verify(mockActorContext).executeOperationAsync(
209                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
210     }
211
212     @Test(expected=IllegalStateException.class)
213     public void testReadPreConditionCheck() {
214         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
215         transactionProxy.read(TestModel.TEST_PATH);
216     }
217
218     @Test(expected=IllegalArgumentException.class)
219     public void testInvalidCreateTransactionReply() throws Throwable {
220         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
221
222         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
223             actorSelection(actorRef.path().toString());
224
225         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
226             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
227
228         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
229             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
230             any(Timeout.class));
231
232         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
233
234         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
235     }
236
237     @Test
238     public void testExists() throws Exception {
239         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
240
241         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
242
243         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
244                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
245
246         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
247
248         assertEquals("Exists response", false, exists);
249
250         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
251                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
252
253         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
254
255         assertEquals("Exists response", true, exists);
256     }
257
258     @Test(expected = PrimaryNotFoundException.class)
259     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
260         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
261             @Override
262             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
263                 return proxy.exists(TestModel.TEST_PATH);
264             }
265         });
266     }
267
268     @Test(expected = ReadFailedException.class)
269     public void testExistsWithInvalidReplyMessageType() throws Exception {
270         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
271
272         doReturn(Futures.successful(new Object())).when(mockActorContext).
273                 executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
274
275         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
276
277         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
278     }
279
280     @Test(expected = TestException.class)
281     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
282         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
283
284         doReturn(Futures.failed(new TestException())).when(mockActorContext).
285                 executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
286
287         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
288
289         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
290     }
291
292     @Test
293     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
294         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
295
296         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
297
298         expectBatchedModifications(actorRef, 1);
299
300         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
301                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
302
303         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
304
305         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
306
307         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
308
309         assertEquals("Exists response", true, exists);
310
311         InOrder inOrder = Mockito.inOrder(mockActorContext);
312         inOrder.verify(mockActorContext).executeOperationAsync(
313                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
314
315         inOrder.verify(mockActorContext).executeOperationAsync(
316                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
317     }
318
319     @Test(expected=IllegalStateException.class)
320     public void testExistsPreConditionCheck() {
321         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
322         transactionProxy.exists(TestModel.TEST_PATH);
323     }
324
325     @Test
326     public void testWrite() throws Exception {
327         dataStoreContextBuilder.shardBatchedModificationCount(1);
328         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
329
330         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
331
332         expectBatchedModifications(actorRef, 1);
333
334         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
335
336         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
337
338         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
339     }
340
341     @Test
342     public void testWriteAfterAsyncRead() throws Throwable {
343         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
344
345         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
346         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
347                 eq(getSystem().actorSelection(actorRef.path())),
348                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
349
350         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
351                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
352
353         expectBatchedModificationsReady(actorRef);
354
355         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
356
357         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
358
359         final CountDownLatch readComplete = new CountDownLatch(1);
360         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
361         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
362                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
363                     @Override
364                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
365                         try {
366                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
367                         } catch (Exception e) {
368                             caughtEx.set(e);
369                         } finally {
370                             readComplete.countDown();
371                         }
372                     }
373
374                     @Override
375                     public void onFailure(Throwable t) {
376                         caughtEx.set(t);
377                         readComplete.countDown();
378                     }
379                 });
380
381         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
382
383         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
384
385         if(caughtEx.get() != null) {
386             throw caughtEx.get();
387         }
388
389         // This sends the batched modification.
390         transactionProxy.ready();
391
392         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
393     }
394
395     @Test(expected=IllegalStateException.class)
396     public void testWritePreConditionCheck() {
397         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
398         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
399     }
400
401     @Test(expected=IllegalStateException.class)
402     public void testWriteAfterReadyPreConditionCheck() {
403         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
404
405         transactionProxy.ready();
406
407         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
408     }
409
410     @Test
411     public void testMerge() throws Exception {
412         dataStoreContextBuilder.shardBatchedModificationCount(1);
413         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
414
415         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
416
417         expectBatchedModifications(actorRef, 1);
418
419         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
420
421         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
422
423         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
424     }
425
426     @Test
427     public void testDelete() throws Exception {
428         dataStoreContextBuilder.shardBatchedModificationCount(1);
429         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
430
431         expectBatchedModifications(actorRef, 1);
432
433         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
434
435         transactionProxy.delete(TestModel.TEST_PATH);
436
437         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
438     }
439
440     @Test
441     public void testReadWrite() throws Exception {
442         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
443
444         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
445
446         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
447                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
448
449         expectBatchedModifications(actorRef, 1);
450
451         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
452
453         transactionProxy.read(TestModel.TEST_PATH);
454
455         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
456
457         transactionProxy.read(TestModel.TEST_PATH);
458
459         transactionProxy.read(TestModel.TEST_PATH);
460
461         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
462         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
463
464         verifyBatchedModifications(batchedModifications.get(0), false,
465                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
466     }
467
468     @Test
469     public void testReadyWithReadWrite() throws Exception {
470         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
471
472         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
473
474         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
475                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
476
477         expectBatchedModificationsReady(actorRef, true);
478
479         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
480
481         transactionProxy.read(TestModel.TEST_PATH);
482
483         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
484
485         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
486
487         assertTrue(ready instanceof SingleCommitCohortProxy);
488
489         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
490
491         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
492         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
493
494         verifyBatchedModifications(batchedModifications.get(0), true, true,
495                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
496
497         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
498     }
499
500     @Test
501     public void testReadyWithNoModifications() throws Exception {
502         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
503
504         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
505                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
506
507         expectBatchedModificationsReady(actorRef, true);
508
509         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
510
511         transactionProxy.read(TestModel.TEST_PATH);
512
513         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
514
515         assertTrue(ready instanceof SingleCommitCohortProxy);
516
517         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
518
519         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
520         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
521
522         verifyBatchedModifications(batchedModifications.get(0), true, true);
523     }
524
525     @Test
526     public void testReadyWithMultipleShardWrites() throws Exception {
527         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
528
529         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
530
531         expectBatchedModificationsReady(actorRef1);
532         expectBatchedModificationsReady(actorRef2);
533
534         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
535
536         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
537         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
538
539         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
540
541         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
542
543         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
544                 actorSelection(actorRef2));
545     }
546
547     @Test
548     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
549         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
550
551         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
552
553         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
554
555         expectBatchedModificationsReady(actorRef, true);
556
557         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
558
559         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
560
561         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
562
563         assertTrue(ready instanceof SingleCommitCohortProxy);
564
565         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
566
567         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
568         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
569
570         verifyBatchedModifications(batchedModifications.get(0), true, true,
571                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
572     }
573
574     @Test
575     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
576         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
577         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
578
579         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
580
581         expectBatchedModificationsReady(actorRef, true);
582
583         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
584
585         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
586
587         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
588
589         assertTrue(ready instanceof SingleCommitCohortProxy);
590
591         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
592
593         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
594         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
595
596         verifyBatchedModifications(batchedModifications.get(0), false,
597                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
598
599         verifyBatchedModifications(batchedModifications.get(1), true, true);
600     }
601
602     @Test
603     public void testReadyWithReplyFailure() throws Exception {
604         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
605
606         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
607
608         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
609
610         expectFailedBatchedModifications(actorRef);
611
612         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
613
614         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
615
616         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
617
618         assertTrue(ready instanceof SingleCommitCohortProxy);
619
620         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
621     }
622
623     @Test
624     public void testReadyWithDebugContextEnabled() throws Exception {
625         dataStoreContextBuilder.transactionDebugContextEnabled(true);
626
627         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
628
629         expectBatchedModificationsReady(actorRef, true);
630
631         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
632
633         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
634
635         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
636
637         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
638
639         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
640     }
641
642     @Test
643     public void testReadyWithLocalTransaction() throws Exception {
644         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
645
646         doReturn(getSystem().actorSelection(shardActorRef.path())).
647                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
648
649         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).
650                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
651
652         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
653
654         expectReadyLocalTransaction(shardActorRef, true);
655
656         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
657         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
658
659         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
660         assertTrue(ready instanceof SingleCommitCohortProxy);
661         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
662     }
663
664     @Test
665     public void testReadyWithLocalTransactionWithFailure() throws Exception {
666         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
667
668         doReturn(getSystem().actorSelection(shardActorRef.path())).
669                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
670
671         DataTree mockDataTree = createDataTree();
672         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
673         doThrow(new RuntimeException("mock")).when(mockModification).ready();
674
675         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).
676                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
677
678         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
679
680         expectReadyLocalTransaction(shardActorRef, true);
681
682         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
683         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
684
685         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
686         assertTrue(ready instanceof SingleCommitCohortProxy);
687         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
688     }
689
690     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
691         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
692
693         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
694
695         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
696
697         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
698
699         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
700
701         transactionProxy.delete(TestModel.TEST_PATH);
702
703         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
704
705         assertTrue(ready instanceof SingleCommitCohortProxy);
706
707         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
708     }
709
710     @Test
711     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
712         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
713     }
714
715     @Test
716     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
717         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
718     }
719
720     @Test
721     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
722         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
723     }
724
725     @Test
726     public void testReadyWithInvalidReplyMessageType() throws Exception {
727         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
728         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
729
730         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
731
732         doReturn(Futures.successful(new Object())).when(mockActorContext).
733                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class),
734                         any(Timeout.class));
735
736         expectBatchedModificationsReady(actorRef2);
737
738         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
739
740         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
741         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
742
743         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
744
745         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
746
747         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
748                 IllegalArgumentException.class);
749     }
750
751     @Test
752     public void testGetIdentifier() {
753         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
754         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
755
756         Object id = transactionProxy.getIdentifier();
757         assertNotNull("getIdentifier returned null", id);
758         assertTrue("Invalid identifier: " + id, id.toString().contains(MemberName.forName(memberName).toString()));
759     }
760
761     @Test
762     public void testClose() throws Exception{
763         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
764
765         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
766                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
767
768         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
769
770         transactionProxy.read(TestModel.TEST_PATH);
771
772         transactionProxy.close();
773
774         verify(mockActorContext).sendOperationAsync(
775                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
776     }
777
778     private static interface TransactionProxyOperation {
779         void run(TransactionProxy transactionProxy);
780     }
781
782     private void throttleOperation(TransactionProxyOperation operation) {
783         throttleOperation(operation, 1, true);
784     }
785
786     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
787         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
788                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
789     }
790
791     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
792         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
793     }
794
795     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, DataTree dataTree){
796         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
797                 dataTree);
798     }
799
800
801     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
802         ActorSystem actorSystem = getSystem();
803         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
804
805         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
806         // we now allow one extra permit to be allowed for ready
807         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
808                 shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
809
810         doReturn(actorSystem.actorSelection(shardActorRef.path())).
811                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
812
813         if(shardFound) {
814             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
815                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
816             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
817                     when(mockActorContext).findPrimaryShardAsync(eq("cars"));
818
819         } else {
820             doReturn(Futures.failed(new Exception("not found")))
821                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
822         }
823
824         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
825
826         doReturn(incompleteFuture()).when(mockActorContext).
827         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
828                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
829
830         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
831
832         long start = System.nanoTime();
833
834         operation.run(transactionProxy);
835
836         long end = System.nanoTime();
837
838         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
839                 expectedCompletionTime, (end-start)),
840                 ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
841
842     }
843
844     private void completeOperation(TransactionProxyOperation operation){
845         completeOperation(operation, true);
846     }
847
848     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
849         ActorSystem actorSystem = getSystem();
850         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
851
852         doReturn(actorSystem.actorSelection(shardActorRef.path())).
853                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
854
855         if(shardFound) {
856             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
857                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
858         } else {
859             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
860                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
861         }
862
863         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
864         String actorPath = txActorRef.path().toString();
865         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
866                 DataStoreVersions.CURRENT_VERSION);
867
868         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
869
870         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
871                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
872                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
873
874         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
875
876         long start = System.nanoTime();
877
878         operation.run(transactionProxy);
879
880         long end = System.nanoTime();
881
882         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
883         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
884                 expected, (end-start)), (end - start) <= expected);
885     }
886
887     private void completeOperationLocal(TransactionProxyOperation operation, DataTree dataTree){
888         ActorSystem actorSystem = getSystem();
889         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
890
891         doReturn(actorSystem.actorSelection(shardActorRef.path())).
892                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
893
894         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).
895                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
896
897         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
898
899         long start = System.nanoTime();
900
901         operation.run(transactionProxy);
902
903         long end = System.nanoTime();
904
905         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
906         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
907                 expected, (end-start)), (end - start) <= expected);
908     }
909
910     private static DataTree createDataTree(){
911         DataTree dataTree = mock(DataTree.class);
912         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
913         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
914
915         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
916         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
917
918         return dataTree;
919     }
920
921     private static DataTree createDataTree(NormalizedNode<?, ?> readResponse){
922         DataTree dataTree = mock(DataTree.class);
923         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
924         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
925
926         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
927         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
928         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
929
930         return dataTree;
931     }
932
933
934     @Test
935     public void testWriteCompletionForLocalShard(){
936         completeOperationLocal(new TransactionProxyOperation() {
937             @Override
938             public void run(TransactionProxy transactionProxy) {
939                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
940
941                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
942
943                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
944
945             }
946         }, createDataTree());
947     }
948
949     @Test
950     public void testWriteThrottlingWhenShardFound(){
951         throttleOperation(new TransactionProxyOperation() {
952             @Override
953             public void run(TransactionProxy transactionProxy) {
954                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
955
956                 expectIncompleteBatchedModifications();
957
958                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
959
960                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
961             }
962         });
963     }
964
965     @Test
966     public void testWriteThrottlingWhenShardNotFound(){
967         // Confirm that there is no throttling when the Shard is not found
968         completeOperation(new TransactionProxyOperation() {
969             @Override
970             public void run(TransactionProxy transactionProxy) {
971                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
972
973                 expectBatchedModifications(2);
974
975                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
976
977                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
978             }
979         }, false);
980
981     }
982
983
984     @Test
985     public void testWriteCompletion(){
986         completeOperation(new TransactionProxyOperation() {
987             @Override
988             public void run(TransactionProxy transactionProxy) {
989                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
990
991                 expectBatchedModifications(2);
992
993                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
994
995                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
996             }
997         });
998     }
999
1000     @Test
1001     public void testMergeThrottlingWhenShardFound(){
1002         throttleOperation(new TransactionProxyOperation() {
1003             @Override
1004             public void run(TransactionProxy transactionProxy) {
1005                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1006
1007                 expectIncompleteBatchedModifications();
1008
1009                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1010
1011                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1012             }
1013         });
1014     }
1015
1016     @Test
1017     public void testMergeThrottlingWhenShardNotFound(){
1018         completeOperation(new TransactionProxyOperation() {
1019             @Override
1020             public void run(TransactionProxy transactionProxy) {
1021                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1022
1023                 expectBatchedModifications(2);
1024
1025                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1026
1027                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1028             }
1029         }, false);
1030     }
1031
1032     @Test
1033     public void testMergeCompletion(){
1034         completeOperation(new TransactionProxyOperation() {
1035             @Override
1036             public void run(TransactionProxy transactionProxy) {
1037                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1038
1039                 expectBatchedModifications(2);
1040
1041                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1042
1043                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1044             }
1045         });
1046
1047     }
1048
1049     @Test
1050     public void testMergeCompletionForLocalShard(){
1051         completeOperationLocal(new TransactionProxyOperation() {
1052             @Override
1053             public void run(TransactionProxy transactionProxy) {
1054                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1055
1056                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1057
1058                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1059
1060             }
1061         }, createDataTree());
1062     }
1063
1064
1065     @Test
1066     public void testDeleteThrottlingWhenShardFound(){
1067
1068         throttleOperation(new TransactionProxyOperation() {
1069             @Override
1070             public void run(TransactionProxy transactionProxy) {
1071                 expectIncompleteBatchedModifications();
1072
1073                 transactionProxy.delete(TestModel.TEST_PATH);
1074
1075                 transactionProxy.delete(TestModel.TEST_PATH);
1076             }
1077         });
1078     }
1079
1080
1081     @Test
1082     public void testDeleteThrottlingWhenShardNotFound(){
1083
1084         completeOperation(new TransactionProxyOperation() {
1085             @Override
1086             public void run(TransactionProxy transactionProxy) {
1087                 expectBatchedModifications(2);
1088
1089                 transactionProxy.delete(TestModel.TEST_PATH);
1090
1091                 transactionProxy.delete(TestModel.TEST_PATH);
1092             }
1093         }, false);
1094     }
1095
1096     @Test
1097     public void testDeleteCompletionForLocalShard(){
1098         completeOperationLocal(new TransactionProxyOperation() {
1099             @Override
1100             public void run(TransactionProxy transactionProxy) {
1101
1102                 transactionProxy.delete(TestModel.TEST_PATH);
1103
1104                 transactionProxy.delete(TestModel.TEST_PATH);
1105             }
1106         }, createDataTree());
1107
1108     }
1109
1110     @Test
1111     public void testDeleteCompletion(){
1112         completeOperation(new TransactionProxyOperation() {
1113             @Override
1114             public void run(TransactionProxy transactionProxy) {
1115                 expectBatchedModifications(2);
1116
1117                 transactionProxy.delete(TestModel.TEST_PATH);
1118
1119                 transactionProxy.delete(TestModel.TEST_PATH);
1120             }
1121         });
1122
1123     }
1124
1125     @Test
1126     public void testReadThrottlingWhenShardFound(){
1127
1128         throttleOperation(new TransactionProxyOperation() {
1129             @Override
1130             public void run(TransactionProxy transactionProxy) {
1131                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1132                         any(ActorSelection.class), eqReadData());
1133
1134                 transactionProxy.read(TestModel.TEST_PATH);
1135
1136                 transactionProxy.read(TestModel.TEST_PATH);
1137             }
1138         });
1139     }
1140
1141     @Test
1142     public void testReadThrottlingWhenShardNotFound(){
1143
1144         completeOperation(new TransactionProxyOperation() {
1145             @Override
1146             public void run(TransactionProxy transactionProxy) {
1147                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1148                         any(ActorSelection.class), eqReadData());
1149
1150                 transactionProxy.read(TestModel.TEST_PATH);
1151
1152                 transactionProxy.read(TestModel.TEST_PATH);
1153             }
1154         }, false);
1155     }
1156
1157
1158     @Test
1159     public void testReadCompletion(){
1160         completeOperation(new TransactionProxyOperation() {
1161             @Override
1162             public void run(TransactionProxy transactionProxy) {
1163                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1164
1165                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1166                         any(ActorSelection.class), eqReadData(), any(Timeout.class));
1167
1168                 transactionProxy.read(TestModel.TEST_PATH);
1169
1170                 transactionProxy.read(TestModel.TEST_PATH);
1171             }
1172         });
1173
1174     }
1175
1176     @Test
1177     public void testReadCompletionForLocalShard(){
1178         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1179         completeOperationLocal(new TransactionProxyOperation() {
1180             @Override
1181             public void run(TransactionProxy transactionProxy) {
1182                 transactionProxy.read(TestModel.TEST_PATH);
1183
1184                 transactionProxy.read(TestModel.TEST_PATH);
1185             }
1186         }, createDataTree(nodeToRead));
1187
1188     }
1189
1190     @Test
1191     public void testReadCompletionForLocalShardWhenExceptionOccurs(){
1192         completeOperationLocal(new TransactionProxyOperation() {
1193             @Override
1194             public void run(TransactionProxy transactionProxy) {
1195                 transactionProxy.read(TestModel.TEST_PATH);
1196
1197                 transactionProxy.read(TestModel.TEST_PATH);
1198             }
1199         }, createDataTree());
1200
1201     }
1202
1203     @Test
1204     public void testExistsThrottlingWhenShardFound(){
1205
1206         throttleOperation(new TransactionProxyOperation() {
1207             @Override
1208             public void run(TransactionProxy transactionProxy) {
1209                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1210                         any(ActorSelection.class), eqDataExists());
1211
1212                 transactionProxy.exists(TestModel.TEST_PATH);
1213
1214                 transactionProxy.exists(TestModel.TEST_PATH);
1215             }
1216         });
1217     }
1218
1219     @Test
1220     public void testExistsThrottlingWhenShardNotFound(){
1221
1222         completeOperation(new TransactionProxyOperation() {
1223             @Override
1224             public void run(TransactionProxy transactionProxy) {
1225                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1226                         any(ActorSelection.class), eqDataExists());
1227
1228                 transactionProxy.exists(TestModel.TEST_PATH);
1229
1230                 transactionProxy.exists(TestModel.TEST_PATH);
1231             }
1232         }, false);
1233     }
1234
1235
1236     @Test
1237     public void testExistsCompletion(){
1238         completeOperation(new TransactionProxyOperation() {
1239             @Override
1240             public void run(TransactionProxy transactionProxy) {
1241                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1242                         any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1243
1244                 transactionProxy.exists(TestModel.TEST_PATH);
1245
1246                 transactionProxy.exists(TestModel.TEST_PATH);
1247             }
1248         });
1249
1250     }
1251
1252     @Test
1253     public void testExistsCompletionForLocalShard(){
1254         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1255         completeOperationLocal(new TransactionProxyOperation() {
1256             @Override
1257             public void run(TransactionProxy transactionProxy) {
1258                 transactionProxy.exists(TestModel.TEST_PATH);
1259
1260                 transactionProxy.exists(TestModel.TEST_PATH);
1261             }
1262         }, createDataTree(nodeToRead));
1263
1264     }
1265
1266     @Test
1267     public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
1268         completeOperationLocal(new TransactionProxyOperation() {
1269             @Override
1270             public void run(TransactionProxy transactionProxy) {
1271                 transactionProxy.exists(TestModel.TEST_PATH);
1272
1273                 transactionProxy.exists(TestModel.TEST_PATH);
1274             }
1275         }, createDataTree());
1276
1277     }
1278     @Test
1279     public void testReadyThrottling(){
1280
1281         throttleOperation(new TransactionProxyOperation() {
1282             @Override
1283             public void run(TransactionProxy transactionProxy) {
1284                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1285
1286                 expectBatchedModifications(1);
1287
1288                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1289
1290                 transactionProxy.ready();
1291             }
1292         });
1293     }
1294
1295     @Test
1296     public void testReadyThrottlingWithTwoTransactionContexts(){
1297         throttleOperation(new TransactionProxyOperation() {
1298             @Override
1299             public void run(TransactionProxy transactionProxy) {
1300                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1301                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1302
1303                 expectBatchedModifications(2);
1304
1305                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1306
1307                 // Trying to write to Cars will cause another transaction context to get created
1308                 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1309
1310                 // Now ready should block for both transaction contexts
1311                 transactionProxy.ready();
1312             }
1313         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
1314     }
1315
1316     private void testModificationOperationBatching(TransactionType type) throws Exception {
1317         int shardBatchedModificationCount = 3;
1318         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1319
1320         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1321
1322         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1323
1324         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1325         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1326
1327         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1328         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1329
1330         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1331         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1332
1333         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1334         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1335
1336         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1337         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1338
1339         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1340         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1341
1342         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1343         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1344
1345         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1346
1347         transactionProxy.write(writePath1, writeNode1);
1348         transactionProxy.write(writePath2, writeNode2);
1349         transactionProxy.delete(deletePath1);
1350         transactionProxy.merge(mergePath1, mergeNode1);
1351         transactionProxy.merge(mergePath2, mergeNode2);
1352         transactionProxy.write(writePath3, writeNode3);
1353         transactionProxy.merge(mergePath3, mergeNode3);
1354         transactionProxy.delete(deletePath2);
1355
1356         // This sends the last batch.
1357         transactionProxy.ready();
1358
1359         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1360         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1361
1362         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1363                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1364
1365         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1366                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1367
1368         verifyBatchedModifications(batchedModifications.get(2), true, true,
1369                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1370
1371         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1372     }
1373
1374     @Test
1375     public void testReadWriteModificationOperationBatching() throws Throwable {
1376         testModificationOperationBatching(READ_WRITE);
1377     }
1378
1379     @Test
1380     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1381         testModificationOperationBatching(WRITE_ONLY);
1382     }
1383
1384     @Test
1385     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1386         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1387         testModificationOperationBatching(WRITE_ONLY);
1388     }
1389
1390     @Test
1391     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1392
1393         int shardBatchedModificationCount = 10;
1394         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1395
1396         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1397
1398         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1399
1400         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1401         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1402
1403         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1404         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1405
1406         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1407         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1408
1409         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1410         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1411
1412         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1413
1414         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1415                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1416
1417         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1418                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1419
1420         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1421                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1422
1423         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1424
1425         transactionProxy.write(writePath1, writeNode1);
1426         transactionProxy.write(writePath2, writeNode2);
1427
1428         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1429                 get(5, TimeUnit.SECONDS);
1430
1431         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1432         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1433
1434         transactionProxy.merge(mergePath1, mergeNode1);
1435         transactionProxy.merge(mergePath2, mergeNode2);
1436
1437         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1438
1439         transactionProxy.delete(deletePath);
1440
1441         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1442         assertEquals("Exists response", true, exists);
1443
1444         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1445         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1446
1447         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1448         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1449
1450         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1451                 new WriteModification(writePath2, writeNode2));
1452
1453         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1454                 new MergeModification(mergePath2, mergeNode2));
1455
1456         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1457
1458         InOrder inOrder = Mockito.inOrder(mockActorContext);
1459         inOrder.verify(mockActorContext).executeOperationAsync(
1460                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1461
1462         inOrder.verify(mockActorContext).executeOperationAsync(
1463                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1464
1465         inOrder.verify(mockActorContext).executeOperationAsync(
1466                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1467
1468         inOrder.verify(mockActorContext).executeOperationAsync(
1469                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1470
1471         inOrder.verify(mockActorContext).executeOperationAsync(
1472                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1473
1474         inOrder.verify(mockActorContext).executeOperationAsync(
1475                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1476     }
1477
1478     @Test
1479     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1480
1481         SchemaContext schemaContext = SchemaContextHelper.full();
1482         Configuration configuration = mock(Configuration.class);
1483         doReturn(configuration).when(mockActorContext).getConfiguration();
1484         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1485         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1486
1487         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1488         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1489
1490         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1491         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1492
1493         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1494
1495         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1496
1497         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1498
1499         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1500                 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1501
1502         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1503
1504         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1505
1506         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1507
1508         @SuppressWarnings("unchecked")
1509         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1510
1511         for(NormalizedNode<?,?> node : collection){
1512             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1513         }
1514
1515         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1516                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1517
1518         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1519
1520         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1521                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1522
1523         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1524     }
1525
1526
1527     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1528         ActorSystem actorSystem = getSystem();
1529         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1530
1531         doReturn(getSystem().actorSelection(shardActorRef.path())).
1532                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1533
1534         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1535                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1536
1537         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1538
1539         doReturn(actorSystem.actorSelection(txActorRef.path())).
1540                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1541
1542         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1543                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1544                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1545
1546         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1547                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));
1548     }
1549 }