BUG-2138: Make DistributedShardFactory return Futures.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.Sets;
35 import com.google.common.util.concurrent.CheckedFuture;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import java.util.Collection;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicReference;
44 import org.junit.Assert;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.mockito.Mockito;
48 import org.opendaylight.controller.cluster.access.concepts.MemberName;
49 import org.opendaylight.controller.cluster.datastore.config.Configuration;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
52 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
59 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
60 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
62 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
63 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
64 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
65 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
66 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
67 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
68 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
70 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
71 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
75 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
76 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
77 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
78 import scala.concurrent.Promise;
79
80 @SuppressWarnings("resource")
81 public class TransactionProxyTest extends AbstractTransactionProxyTest {
82
83     @SuppressWarnings("serial")
84     static class TestException extends RuntimeException {
85     }
86
87     interface Invoker {
88         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
89     }
90
91     @Test
92     public void testRead() throws Exception {
93         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
94
95         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
96
97         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
98                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
99
100         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
101                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
102
103         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
104
105         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
106
107         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
108                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
109
110         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
111
112         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
113
114         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
115     }
116
117     @Test(expected = ReadFailedException.class)
118     public void testReadWithInvalidReplyMessageType() throws Exception {
119         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
120
121         doReturn(Futures.successful(new Object())).when(mockActorContext)
122                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
123
124         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
125
126         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
127     }
128
129     @Test(expected = TestException.class)
130     public void testReadWithAsyncRemoteOperatonFailure() throws Exception {
131         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
132
133         doReturn(Futures.failed(new TestException())).when(mockActorContext)
134                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
135
136         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
137
138         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
139     }
140
141     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) throws Exception {
142         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
143
144         if (exToThrow instanceof PrimaryNotFoundException) {
145             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
146         } else {
147             doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
148                     .findPrimaryShardAsync(anyString());
149         }
150
151         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
152                 any(ActorSelection.class), any(), any(Timeout.class));
153
154         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
155
156         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
157     }
158
159     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Exception {
160         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
161     }
162
163     @Test(expected = PrimaryNotFoundException.class)
164     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
165         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
166     }
167
168     @Test(expected = TimeoutException.class)
169     public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
170         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
171                 new Exception("reason")));
172     }
173
174     @Test(expected = TestException.class)
175     public void testReadWhenAnyOtherExceptionIsThrown() throws Exception {
176         testReadWithExceptionOnInitialCreateTransaction(new TestException());
177     }
178
179     @Test
180     public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
181         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
182
183         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
184
185         expectBatchedModifications(actorRef, 1);
186
187         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
188                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
189
190         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
191
192         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
193
194         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
195                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
196
197         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
198         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
199
200         InOrder inOrder = Mockito.inOrder(mockActorContext);
201         inOrder.verify(mockActorContext).executeOperationAsync(
202                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
203
204         inOrder.verify(mockActorContext).executeOperationAsync(
205                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
206     }
207
208     @Test(expected = IllegalStateException.class)
209     public void testReadPreConditionCheck() {
210         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
211         transactionProxy.read(TestModel.TEST_PATH);
212     }
213
214     @Test(expected = IllegalArgumentException.class)
215     public void testInvalidCreateTransactionReply() throws Exception {
216         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
217
218         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
219                 .actorSelection(actorRef.path().toString());
220
221         doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
222                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
223
224         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
225             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
226             any(Timeout.class));
227
228         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
229
230         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
231     }
232
233     @Test
234     public void testExists() throws Exception {
235         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
236
237         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
238
239         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
240                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
241
242         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
243
244         assertEquals("Exists response", false, exists);
245
246         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
247                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
248
249         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
250
251         assertEquals("Exists response", true, exists);
252     }
253
254     @Test(expected = PrimaryNotFoundException.class)
255     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
256         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
257             proxy -> proxy.exists(TestModel.TEST_PATH));
258     }
259
260     @Test(expected = ReadFailedException.class)
261     public void testExistsWithInvalidReplyMessageType() throws Exception {
262         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
263
264         doReturn(Futures.successful(new Object())).when(mockActorContext)
265                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
266
267         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
268
269         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
270     }
271
272     @Test(expected = TestException.class)
273     public void testExistsWithAsyncRemoteOperatonFailure() throws Exception {
274         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
275
276         doReturn(Futures.failed(new TestException())).when(mockActorContext)
277                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
278
279         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
280
281         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
282     }
283
284     @Test
285     public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
286         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
287
288         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
289
290         expectBatchedModifications(actorRef, 1);
291
292         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
293                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
294
295         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
296
297         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
298
299         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
300
301         assertEquals("Exists response", true, exists);
302
303         InOrder inOrder = Mockito.inOrder(mockActorContext);
304         inOrder.verify(mockActorContext).executeOperationAsync(
305                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
306
307         inOrder.verify(mockActorContext).executeOperationAsync(
308                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
309     }
310
311     @Test(expected = IllegalStateException.class)
312     public void testExistsPreConditionCheck() {
313         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
314         transactionProxy.exists(TestModel.TEST_PATH);
315     }
316
317     @Test
318     public void testWrite() throws Exception {
319         dataStoreContextBuilder.shardBatchedModificationCount(1);
320         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
321
322         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
323
324         expectBatchedModifications(actorRef, 1);
325
326         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
327
328         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
329
330         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
331     }
332
333     @Test
334     @SuppressWarnings("checkstyle:IllegalCatch")
335     public void testWriteAfterAsyncRead() throws Exception {
336         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
337                 DefaultShardStrategy.DEFAULT_SHARD);
338
339         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
340         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
341                 eq(getSystem().actorSelection(actorRef.path())),
342                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
343
344         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
345                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
346
347         expectBatchedModificationsReady(actorRef);
348
349         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
350
351         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
352
353         final CountDownLatch readComplete = new CountDownLatch(1);
354         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
355         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
356                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
357                     @Override
358                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
359                         try {
360                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
361                         } catch (Exception e) {
362                             caughtEx.set(e);
363                         } finally {
364                             readComplete.countDown();
365                         }
366                     }
367
368                     @Override
369                     public void onFailure(Throwable failure) {
370                         caughtEx.set(failure);
371                         readComplete.countDown();
372                     }
373                 });
374
375         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
376
377         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
378
379         if (caughtEx.get() != null) {
380             Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
381             Throwables.propagate(caughtEx.get());
382         }
383
384         // This sends the batched modification.
385         transactionProxy.ready();
386
387         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
388     }
389
390     @Test(expected = IllegalStateException.class)
391     public void testWritePreConditionCheck() {
392         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
393         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
394     }
395
396     @Test(expected = IllegalStateException.class)
397     public void testWriteAfterReadyPreConditionCheck() {
398         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
399
400         transactionProxy.ready();
401
402         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
403     }
404
405     @Test
406     public void testMerge() throws Exception {
407         dataStoreContextBuilder.shardBatchedModificationCount(1);
408         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
409
410         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
411
412         expectBatchedModifications(actorRef, 1);
413
414         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
415
416         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
417
418         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
419     }
420
421     @Test
422     public void testDelete() throws Exception {
423         dataStoreContextBuilder.shardBatchedModificationCount(1);
424         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
425
426         expectBatchedModifications(actorRef, 1);
427
428         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
429
430         transactionProxy.delete(TestModel.TEST_PATH);
431
432         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
433     }
434
435     @Test
436     public void testReadWrite() throws Exception {
437         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
438
439         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
440
441         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
442                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
443
444         expectBatchedModifications(actorRef, 1);
445
446         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
447
448         transactionProxy.read(TestModel.TEST_PATH);
449
450         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
451
452         transactionProxy.read(TestModel.TEST_PATH);
453
454         transactionProxy.read(TestModel.TEST_PATH);
455
456         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
457         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
458
459         verifyBatchedModifications(batchedModifications.get(0), false,
460                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
461     }
462
463     @Test
464     public void testReadyWithReadWrite() throws Exception {
465         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
466
467         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
468
469         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
470                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
471
472         expectBatchedModificationsReady(actorRef, true);
473
474         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
475
476         transactionProxy.read(TestModel.TEST_PATH);
477
478         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
479
480         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
481
482         assertTrue(ready instanceof SingleCommitCohortProxy);
483
484         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
485
486         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
487         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
488
489         verifyBatchedModifications(batchedModifications.get(0), true, true,
490                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
491
492         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
493     }
494
495     @Test
496     public void testReadyWithNoModifications() throws Exception {
497         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
498
499         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
500                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
501
502         expectBatchedModificationsReady(actorRef, true);
503
504         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
505
506         transactionProxy.read(TestModel.TEST_PATH);
507
508         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
509
510         assertTrue(ready instanceof SingleCommitCohortProxy);
511
512         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
513
514         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
515         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
516
517         verifyBatchedModifications(batchedModifications.get(0), true, true);
518     }
519
520     @Test
521     public void testReadyWithMultipleShardWrites() throws Exception {
522         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
523
524         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
525
526         expectBatchedModificationsReady(actorRef1);
527         expectBatchedModificationsReady(actorRef2);
528
529         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
530
531         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
532         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
533
534         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
535
536         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
537
538         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
539                 actorSelection(actorRef2));
540     }
541
542     @Test
543     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
544         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
545
546         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
547
548         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
549
550         expectBatchedModificationsReady(actorRef, true);
551
552         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
553
554         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
555
556         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
557
558         assertTrue(ready instanceof SingleCommitCohortProxy);
559
560         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
561
562         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
563         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
564
565         verifyBatchedModifications(batchedModifications.get(0), true, true,
566                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
567     }
568
569     @Test
570     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
571         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
572         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
573
574         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
575
576         expectBatchedModificationsReady(actorRef, true);
577
578         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
579
580         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
581
582         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
583
584         assertTrue(ready instanceof SingleCommitCohortProxy);
585
586         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
587
588         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
589         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
590
591         verifyBatchedModifications(batchedModifications.get(0), false,
592                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
593
594         verifyBatchedModifications(batchedModifications.get(1), true, true);
595     }
596
597     @Test
598     public void testReadyWithReplyFailure() throws Exception {
599         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
600
601         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
602
603         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
604
605         expectFailedBatchedModifications(actorRef);
606
607         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
608
609         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
610
611         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
612
613         assertTrue(ready instanceof SingleCommitCohortProxy);
614
615         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
616     }
617
618     @Test
619     public void testReadyWithDebugContextEnabled() throws Exception {
620         dataStoreContextBuilder.transactionDebugContextEnabled(true);
621
622         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
623
624         expectBatchedModificationsReady(actorRef, true);
625
626         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
627
628         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
629
630         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
631
632         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
633
634         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
635     }
636
637     @Test
638     public void testReadyWithLocalTransaction() throws Exception {
639         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
640
641         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
642                 .actorSelection(shardActorRef.path().toString());
643
644         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
645                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
646
647         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
648
649         expectReadyLocalTransaction(shardActorRef, true);
650
651         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
652         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
653
654         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
655         assertTrue(ready instanceof SingleCommitCohortProxy);
656         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
657     }
658
659     @Test
660     public void testReadyWithLocalTransactionWithFailure() throws Exception {
661         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
662
663         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
664                 .actorSelection(shardActorRef.path().toString());
665
666         DataTree mockDataTree = createDataTree();
667         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
668         doThrow(new RuntimeException("mock")).when(mockModification).ready();
669
670         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
671                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
672
673         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
674
675         expectReadyLocalTransaction(shardActorRef, true);
676
677         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
678         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
679
680         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
681         assertTrue(ready instanceof SingleCommitCohortProxy);
682         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
683     }
684
685     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
686         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
687
688         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
689
690         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
691
692         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
693
694         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
695
696         transactionProxy.delete(TestModel.TEST_PATH);
697
698         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
699
700         assertTrue(ready instanceof SingleCommitCohortProxy);
701
702         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
703     }
704
705     @Test
706     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
707         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
708     }
709
710     @Test
711     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
712         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
713     }
714
715     @Test
716     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
717         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
718     }
719
720     @Test
721     public void testReadyWithInvalidReplyMessageType() throws Exception {
722         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
723         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
724
725         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
726
727         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
728                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
729
730         expectBatchedModificationsReady(actorRef2);
731
732         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
733
734         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
735         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
736
737         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
738
739         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
740
741         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
742                 IllegalArgumentException.class);
743     }
744
745     @Test
746     public void testGetIdentifier() {
747         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
748         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
749
750         Object id = transactionProxy.getIdentifier();
751         assertNotNull("getIdentifier returned null", id);
752         assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
753     }
754
755     @Test
756     public void testClose() throws Exception {
757         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
758
759         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
760                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
761
762         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
763
764         transactionProxy.read(TestModel.TEST_PATH);
765
766         transactionProxy.close();
767
768         verify(mockActorContext).sendOperationAsync(
769                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
770     }
771
772     private interface TransactionProxyOperation {
773         void run(TransactionProxy transactionProxy);
774     }
775
776     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef) {
777         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
778     }
779
780     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, DataTree dataTree) {
781         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
782                 dataTree);
783     }
784
785     private void throttleOperation(TransactionProxyOperation operation) {
786         throttleOperation(operation, 1, true);
787     }
788
789     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound) {
790         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
791                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
792     }
793
794     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound,
795             long expectedCompletionTime) {
796         ActorSystem actorSystem = getSystem();
797         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
798
799         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
800         // we now allow one extra permit to be allowed for ready
801         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
802                 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
803                         .getDatastoreContext();
804
805         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
806                 .actorSelection(shardActorRef.path().toString());
807
808         if (shardFound) {
809             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
810                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
811             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
812                     .findPrimaryShardAsync(eq("cars"));
813
814         } else {
815             doReturn(Futures.failed(new Exception("not found")))
816                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
817         }
818
819         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
820                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
821                 any(Timeout.class));
822
823         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
824
825         long start = System.nanoTime();
826
827         operation.run(transactionProxy);
828
829         long end = System.nanoTime();
830
831         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
832                 expectedCompletionTime, end - start),
833                 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
834
835     }
836
837     private void completeOperation(TransactionProxyOperation operation) {
838         completeOperation(operation, true);
839     }
840
841     private void completeOperation(TransactionProxyOperation operation, boolean shardFound) {
842         ActorSystem actorSystem = getSystem();
843         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
844
845         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
846                 .actorSelection(shardActorRef.path().toString());
847
848         if (shardFound) {
849             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
850                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
851         } else {
852             doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
853                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
854         }
855
856         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
857         String actorPath = txActorRef.path().toString();
858         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
859                 DataStoreVersions.CURRENT_VERSION);
860
861         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
862
863         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
864                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
865                 any(Timeout.class));
866
867         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
868
869         long start = System.nanoTime();
870
871         operation.run(transactionProxy);
872
873         long end = System.nanoTime();
874
875         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
876                 .getOperationTimeoutInMillis());
877         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
878                 expected, end - start), end - start <= expected);
879     }
880
881     private void completeOperationLocal(TransactionProxyOperation operation, DataTree dataTree) {
882         ActorSystem actorSystem = getSystem();
883         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
884
885         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
886                 .actorSelection(shardActorRef.path().toString());
887
888         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
889                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
890
891         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
892
893         long start = System.nanoTime();
894
895         operation.run(transactionProxy);
896
897         long end = System.nanoTime();
898
899         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
900                 .getOperationTimeoutInMillis());
901         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
902                 end - start <= expected);
903     }
904
905     private static DataTree createDataTree() {
906         DataTree dataTree = mock(DataTree.class);
907         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
908         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
909
910         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
911         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
912
913         return dataTree;
914     }
915
916     private static DataTree createDataTree(NormalizedNode<?, ?> readResponse) {
917         DataTree dataTree = mock(DataTree.class);
918         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
919         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
920
921         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
922         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
923         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
924
925         return dataTree;
926     }
927
928
929     @Test
930     public void testWriteCompletionForLocalShard() {
931         completeOperationLocal(transactionProxy -> {
932             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
933
934             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
935
936             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
937
938         }, createDataTree());
939     }
940
941     @Test
942     public void testWriteThrottlingWhenShardFound() {
943         throttleOperation(transactionProxy -> {
944             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
945
946             expectIncompleteBatchedModifications();
947
948             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
949
950             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
951         });
952     }
953
954     @Test
955     public void testWriteThrottlingWhenShardNotFound() {
956         // Confirm that there is no throttling when the Shard is not found
957         completeOperation(transactionProxy -> {
958             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
959
960             expectBatchedModifications(2);
961
962             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
963
964             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
965         }, false);
966
967     }
968
969
970     @Test
971     public void testWriteCompletion() {
972         completeOperation(transactionProxy -> {
973             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
974
975             expectBatchedModifications(2);
976
977             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
978
979             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
980         });
981     }
982
983     @Test
984     public void testMergeThrottlingWhenShardFound() {
985         throttleOperation(transactionProxy -> {
986             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
987
988             expectIncompleteBatchedModifications();
989
990             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
991
992             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
993         });
994     }
995
996     @Test
997     public void testMergeThrottlingWhenShardNotFound() {
998         completeOperation(transactionProxy -> {
999             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1000
1001             expectBatchedModifications(2);
1002
1003             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1004
1005             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1006         }, false);
1007     }
1008
1009     @Test
1010     public void testMergeCompletion() {
1011         completeOperation(transactionProxy -> {
1012             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1013
1014             expectBatchedModifications(2);
1015
1016             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1017
1018             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1019         });
1020
1021     }
1022
1023     @Test
1024     public void testMergeCompletionForLocalShard() {
1025         completeOperationLocal(transactionProxy -> {
1026             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1027
1028             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1029
1030             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1031
1032         }, createDataTree());
1033     }
1034
1035
1036     @Test
1037     public void testDeleteThrottlingWhenShardFound() {
1038
1039         throttleOperation(transactionProxy -> {
1040             expectIncompleteBatchedModifications();
1041
1042             transactionProxy.delete(TestModel.TEST_PATH);
1043
1044             transactionProxy.delete(TestModel.TEST_PATH);
1045         });
1046     }
1047
1048
1049     @Test
1050     public void testDeleteThrottlingWhenShardNotFound() {
1051
1052         completeOperation(transactionProxy -> {
1053             expectBatchedModifications(2);
1054
1055             transactionProxy.delete(TestModel.TEST_PATH);
1056
1057             transactionProxy.delete(TestModel.TEST_PATH);
1058         }, false);
1059     }
1060
1061     @Test
1062     public void testDeleteCompletionForLocalShard() {
1063         completeOperationLocal(transactionProxy -> {
1064
1065             transactionProxy.delete(TestModel.TEST_PATH);
1066
1067             transactionProxy.delete(TestModel.TEST_PATH);
1068         }, createDataTree());
1069
1070     }
1071
1072     @Test
1073     public void testDeleteCompletion() {
1074         completeOperation(transactionProxy -> {
1075             expectBatchedModifications(2);
1076
1077             transactionProxy.delete(TestModel.TEST_PATH);
1078
1079             transactionProxy.delete(TestModel.TEST_PATH);
1080         });
1081
1082     }
1083
1084     @Test
1085     public void testReadThrottlingWhenShardFound() {
1086
1087         throttleOperation(transactionProxy -> {
1088             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1089                     any(ActorSelection.class), eqReadData());
1090
1091             transactionProxy.read(TestModel.TEST_PATH);
1092
1093             transactionProxy.read(TestModel.TEST_PATH);
1094         });
1095     }
1096
1097     @Test
1098     public void testReadThrottlingWhenShardNotFound() {
1099
1100         completeOperation(transactionProxy -> {
1101             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1102                     any(ActorSelection.class), eqReadData());
1103
1104             transactionProxy.read(TestModel.TEST_PATH);
1105
1106             transactionProxy.read(TestModel.TEST_PATH);
1107         }, false);
1108     }
1109
1110
1111     @Test
1112     public void testReadCompletion() {
1113         completeOperation(transactionProxy -> {
1114             NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1115
1116             doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1117                     any(ActorSelection.class), eqReadData(), any(Timeout.class));
1118
1119             transactionProxy.read(TestModel.TEST_PATH);
1120
1121             transactionProxy.read(TestModel.TEST_PATH);
1122         });
1123
1124     }
1125
1126     @Test
1127     public void testReadCompletionForLocalShard() {
1128         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1129         completeOperationLocal(transactionProxy -> {
1130             transactionProxy.read(TestModel.TEST_PATH);
1131
1132             transactionProxy.read(TestModel.TEST_PATH);
1133         }, createDataTree(nodeToRead));
1134
1135     }
1136
1137     @Test
1138     public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1139         completeOperationLocal(transactionProxy -> {
1140             transactionProxy.read(TestModel.TEST_PATH);
1141
1142             transactionProxy.read(TestModel.TEST_PATH);
1143         }, createDataTree());
1144
1145     }
1146
1147     @Test
1148     public void testExistsThrottlingWhenShardFound() {
1149
1150         throttleOperation(transactionProxy -> {
1151             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1152                     any(ActorSelection.class), eqDataExists());
1153
1154             transactionProxy.exists(TestModel.TEST_PATH);
1155
1156             transactionProxy.exists(TestModel.TEST_PATH);
1157         });
1158     }
1159
1160     @Test
1161     public void testExistsThrottlingWhenShardNotFound() {
1162
1163         completeOperation(transactionProxy -> {
1164             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1165                     any(ActorSelection.class), eqDataExists());
1166
1167             transactionProxy.exists(TestModel.TEST_PATH);
1168
1169             transactionProxy.exists(TestModel.TEST_PATH);
1170         }, false);
1171     }
1172
1173
1174     @Test
1175     public void testExistsCompletion() {
1176         completeOperation(transactionProxy -> {
1177             doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1178                     any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1179
1180             transactionProxy.exists(TestModel.TEST_PATH);
1181
1182             transactionProxy.exists(TestModel.TEST_PATH);
1183         });
1184
1185     }
1186
1187     @Test
1188     public void testExistsCompletionForLocalShard() {
1189         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1190         completeOperationLocal(transactionProxy -> {
1191             transactionProxy.exists(TestModel.TEST_PATH);
1192
1193             transactionProxy.exists(TestModel.TEST_PATH);
1194         }, createDataTree(nodeToRead));
1195
1196     }
1197
1198     @Test
1199     public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1200         completeOperationLocal(transactionProxy -> {
1201             transactionProxy.exists(TestModel.TEST_PATH);
1202
1203             transactionProxy.exists(TestModel.TEST_PATH);
1204         }, createDataTree());
1205
1206     }
1207
1208     @Test
1209     public void testReadyThrottling() {
1210
1211         throttleOperation(transactionProxy -> {
1212             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1213
1214             expectBatchedModifications(1);
1215
1216             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1217
1218             transactionProxy.ready();
1219         });
1220     }
1221
1222     @Test
1223     public void testReadyThrottlingWithTwoTransactionContexts() {
1224         throttleOperation(transactionProxy -> {
1225             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1226             NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1227
1228             expectBatchedModifications(2);
1229
1230             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1231
1232             // Trying to write to Cars will cause another transaction context to get created
1233             transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1234
1235             // Now ready should block for both transaction contexts
1236             transactionProxy.ready();
1237         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1238                 .getOperationTimeoutInMillis()) * 2);
1239     }
1240
1241     private void testModificationOperationBatching(TransactionType type) throws Exception {
1242         int shardBatchedModificationCount = 3;
1243         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1244
1245         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1246
1247         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1248
1249         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1250         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1251
1252         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1253         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1254
1255         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1256         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1257
1258         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1259         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1260
1261         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1262         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1263
1264         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1265         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1266
1267         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1268         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1269
1270         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1271
1272         transactionProxy.write(writePath1, writeNode1);
1273         transactionProxy.write(writePath2, writeNode2);
1274         transactionProxy.delete(deletePath1);
1275         transactionProxy.merge(mergePath1, mergeNode1);
1276         transactionProxy.merge(mergePath2, mergeNode2);
1277         transactionProxy.write(writePath3, writeNode3);
1278         transactionProxy.merge(mergePath3, mergeNode3);
1279         transactionProxy.delete(deletePath2);
1280
1281         // This sends the last batch.
1282         transactionProxy.ready();
1283
1284         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1285         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1286
1287         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1288                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1289
1290         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1291                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1292
1293         verifyBatchedModifications(batchedModifications.get(2), true, true,
1294                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1295
1296         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1297     }
1298
1299     @Test
1300     public void testReadWriteModificationOperationBatching() throws Exception {
1301         testModificationOperationBatching(READ_WRITE);
1302     }
1303
1304     @Test
1305     public void testWriteOnlyModificationOperationBatching() throws Exception {
1306         testModificationOperationBatching(WRITE_ONLY);
1307     }
1308
1309     @Test
1310     public void testOptimizedWriteOnlyModificationOperationBatching() throws Exception {
1311         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1312         testModificationOperationBatching(WRITE_ONLY);
1313     }
1314
1315     @Test
1316     public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1317
1318         int shardBatchedModificationCount = 10;
1319         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1320
1321         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1322
1323         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1324
1325         final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1326         final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1327
1328         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1329         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1330
1331         final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1332         final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1333
1334         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1335         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1336
1337         final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1338
1339         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1340                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1341
1342         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1343                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1344
1345         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1346                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1347
1348         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1349
1350         transactionProxy.write(writePath1, writeNode1);
1351         transactionProxy.write(writePath2, writeNode2);
1352
1353         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1354
1355         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1356         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1357
1358         transactionProxy.merge(mergePath1, mergeNode1);
1359         transactionProxy.merge(mergePath2, mergeNode2);
1360
1361         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1362
1363         transactionProxy.delete(deletePath);
1364
1365         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1366         assertEquals("Exists response", true, exists);
1367
1368         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1369         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1370
1371         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1372         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1373
1374         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1375                 new WriteModification(writePath2, writeNode2));
1376
1377         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1378                 new MergeModification(mergePath2, mergeNode2));
1379
1380         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1381
1382         InOrder inOrder = Mockito.inOrder(mockActorContext);
1383         inOrder.verify(mockActorContext).executeOperationAsync(
1384                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1385
1386         inOrder.verify(mockActorContext).executeOperationAsync(
1387                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1388
1389         inOrder.verify(mockActorContext).executeOperationAsync(
1390                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1391
1392         inOrder.verify(mockActorContext).executeOperationAsync(
1393                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1394
1395         inOrder.verify(mockActorContext).executeOperationAsync(
1396                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1397
1398         inOrder.verify(mockActorContext).executeOperationAsync(
1399                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1400     }
1401
1402     @Test
1403     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException,
1404             java.util.concurrent.TimeoutException {
1405         SchemaContext schemaContext = SchemaContextHelper.full();
1406         Configuration configuration = mock(Configuration.class);
1407         doReturn(configuration).when(mockActorContext).getConfiguration();
1408         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1409         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1410
1411         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1412         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1413
1414         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1415         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1416
1417         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1418
1419         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1420
1421         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1422
1423         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1424                 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1425
1426         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1427
1428         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1429
1430         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1431
1432         @SuppressWarnings("unchecked")
1433         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1434
1435         for (NormalizedNode<?,?> node : collection) {
1436             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1437         }
1438
1439         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1440                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1441
1442         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1443
1444         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1445                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1446
1447         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1448     }
1449
1450
1451     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1452         ActorSystem actorSystem = getSystem();
1453         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1454
1455         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1456                 .actorSelection(shardActorRef.path().toString());
1457
1458         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1459                 .findPrimaryShardAsync(eq(shardName));
1460
1461         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1462
1463         doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1464                 .actorSelection(txActorRef.path().toString());
1465
1466         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1467                 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1468                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1469
1470         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1471                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));
1472     }
1473 }