Use java.util.Optional in ShardLeaderStateChanged and PrimaryShardInfo
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.ActorSystem;
28 import akka.actor.Props;
29 import akka.dispatch.Futures;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Sets;
33 import com.google.common.util.concurrent.CheckedFuture;
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
51 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
52 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
57 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
58 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
59 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
60 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
61 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
62 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
63 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
64 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
65 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
73 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Promise;
76
77 @SuppressWarnings("resource")
78 public class TransactionProxyTest extends AbstractTransactionProxyTest {
79
80     @SuppressWarnings("serial")
81     static class TestException extends RuntimeException {
82     }
83
84     static interface Invoker {
85         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
86     }
87
88     @Test
89     public void testRead() throws Exception {
90         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
91
92         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
93
94         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
95                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
96
97         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
98                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
99
100         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
101
102         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
103
104         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
105                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
106
107         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108
109         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
110
111         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
112     }
113
114     @Test(expected = ReadFailedException.class)
115     public void testReadWithInvalidReplyMessageType() throws Exception {
116         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
117
118         doReturn(Futures.successful(new Object())).when(mockActorContext).
119                 executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
120
121         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
122
123         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
124     }
125
126     @Test(expected = TestException.class)
127     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
128         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
129
130         doReturn(Futures.failed(new TestException())).when(mockActorContext).
131                 executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
132
133         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
134
135         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
136     }
137
138     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
139             throws Throwable {
140         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
141
142         if (exToThrow instanceof PrimaryNotFoundException) {
143             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
144         } else {
145             doReturn(primaryShardInfoReply(getSystem(), actorRef)).
146                     when(mockActorContext).findPrimaryShardAsync(anyString());
147         }
148
149         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
150                 any(ActorSelection.class), any(), any(Timeout.class));
151
152         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
153
154         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
155     }
156
157     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
158         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
159             @Override
160             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
161                 return proxy.read(TestModel.TEST_PATH);
162             }
163         });
164     }
165
166     @Test(expected = PrimaryNotFoundException.class)
167     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
168         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
169     }
170
171     @Test(expected = TimeoutException.class)
172     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
173         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
174                 new Exception("reason")));
175     }
176
177     @Test(expected = TestException.class)
178     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
179         testReadWithExceptionOnInitialCreateTransaction(new TestException());
180     }
181
182     @Test
183     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
184         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
185
186         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
187
188         expectBatchedModifications(actorRef, 1);
189
190         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
191                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
192
193         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
194
195         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
196
197         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
198                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
199
200         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
201         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
202
203         InOrder inOrder = Mockito.inOrder(mockActorContext);
204         inOrder.verify(mockActorContext).executeOperationAsync(
205                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
206
207         inOrder.verify(mockActorContext).executeOperationAsync(
208                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
209     }
210
211     @Test(expected=IllegalStateException.class)
212     public void testReadPreConditionCheck() {
213         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
214         transactionProxy.read(TestModel.TEST_PATH);
215     }
216
217     @Test(expected=IllegalArgumentException.class)
218     public void testInvalidCreateTransactionReply() throws Throwable {
219         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
220
221         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
222             actorSelection(actorRef.path().toString());
223
224         doReturn(primaryShardInfoReply(getSystem(), actorRef)).
225             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
226
227         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
228             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
229             any(Timeout.class));
230
231         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
232
233         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
234     }
235
236     @Test
237     public void testExists() throws Exception {
238         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
239
240         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
241
242         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
243                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
244
245         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
246
247         assertEquals("Exists response", false, exists);
248
249         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
250                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
251
252         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
253
254         assertEquals("Exists response", true, exists);
255     }
256
257     @Test(expected = PrimaryNotFoundException.class)
258     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
259         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
260             @Override
261             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
262                 return proxy.exists(TestModel.TEST_PATH);
263             }
264         });
265     }
266
267     @Test(expected = ReadFailedException.class)
268     public void testExistsWithInvalidReplyMessageType() throws Exception {
269         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
270
271         doReturn(Futures.successful(new Object())).when(mockActorContext).
272                 executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
273
274         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
275
276         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
277     }
278
279     @Test(expected = TestException.class)
280     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
281         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
282
283         doReturn(Futures.failed(new TestException())).when(mockActorContext).
284                 executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
285
286         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
287
288         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
289     }
290
291     @Test
292     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
293         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
294
295         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296
297         expectBatchedModifications(actorRef, 1);
298
299         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
300                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
301
302         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
303
304         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
305
306         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
307
308         assertEquals("Exists response", true, exists);
309
310         InOrder inOrder = Mockito.inOrder(mockActorContext);
311         inOrder.verify(mockActorContext).executeOperationAsync(
312                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
313
314         inOrder.verify(mockActorContext).executeOperationAsync(
315                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
316     }
317
318     @Test(expected=IllegalStateException.class)
319     public void testExistsPreConditionCheck() {
320         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
321         transactionProxy.exists(TestModel.TEST_PATH);
322     }
323
324     @Test
325     public void testWrite() throws Exception {
326         dataStoreContextBuilder.shardBatchedModificationCount(1);
327         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
328
329         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
330
331         expectBatchedModifications(actorRef, 1);
332
333         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
334
335         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
336
337         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
338     }
339
340     @Test
341     public void testWriteAfterAsyncRead() throws Throwable {
342         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
343
344         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
345         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
346                 eq(getSystem().actorSelection(actorRef.path())),
347                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
348
349         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
350                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
351
352         expectBatchedModificationsReady(actorRef);
353
354         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
355
356         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
357
358         final CountDownLatch readComplete = new CountDownLatch(1);
359         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
360         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
361                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
362                     @Override
363                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
364                         try {
365                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
366                         } catch (Exception e) {
367                             caughtEx.set(e);
368                         } finally {
369                             readComplete.countDown();
370                         }
371                     }
372
373                     @Override
374                     public void onFailure(Throwable t) {
375                         caughtEx.set(t);
376                         readComplete.countDown();
377                     }
378                 });
379
380         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
381
382         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
383
384         if(caughtEx.get() != null) {
385             throw caughtEx.get();
386         }
387
388         // This sends the batched modification.
389         transactionProxy.ready();
390
391         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
392     }
393
394     @Test(expected=IllegalStateException.class)
395     public void testWritePreConditionCheck() {
396         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
397         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
398     }
399
400     @Test(expected=IllegalStateException.class)
401     public void testWriteAfterReadyPreConditionCheck() {
402         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
403
404         transactionProxy.ready();
405
406         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
407     }
408
409     @Test
410     public void testMerge() throws Exception {
411         dataStoreContextBuilder.shardBatchedModificationCount(1);
412         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
413
414         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
415
416         expectBatchedModifications(actorRef, 1);
417
418         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
419
420         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
421
422         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
423     }
424
425     @Test
426     public void testDelete() throws Exception {
427         dataStoreContextBuilder.shardBatchedModificationCount(1);
428         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
429
430         expectBatchedModifications(actorRef, 1);
431
432         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
433
434         transactionProxy.delete(TestModel.TEST_PATH);
435
436         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
437     }
438
439     @Test
440     public void testReadWrite() throws Exception {
441         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
442
443         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
444
445         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
446                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
447
448         expectBatchedModifications(actorRef, 1);
449
450         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
451
452         transactionProxy.read(TestModel.TEST_PATH);
453
454         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
455
456         transactionProxy.read(TestModel.TEST_PATH);
457
458         transactionProxy.read(TestModel.TEST_PATH);
459
460         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
461         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
462
463         verifyBatchedModifications(batchedModifications.get(0), false,
464                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
465     }
466
467     @Test
468     public void testReadyWithReadWrite() throws Exception {
469         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
470
471         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472
473         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
474                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
475
476         expectBatchedModificationsReady(actorRef, true);
477
478         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
479
480         transactionProxy.read(TestModel.TEST_PATH);
481
482         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
483
484         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
485
486         assertTrue(ready instanceof SingleCommitCohortProxy);
487
488         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
489
490         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
491         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
492
493         verifyBatchedModifications(batchedModifications.get(0), true, true,
494                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
495
496         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
497     }
498
499     @Test
500     public void testReadyWithNoModifications() throws Exception {
501         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
502
503         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
504                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
505
506         expectBatchedModificationsReady(actorRef, true);
507
508         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
509
510         transactionProxy.read(TestModel.TEST_PATH);
511
512         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
513
514         assertTrue(ready instanceof SingleCommitCohortProxy);
515
516         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
517
518         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
519         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
520
521         verifyBatchedModifications(batchedModifications.get(0), true, true);
522     }
523
524     @Test
525     public void testReadyWithMultipleShardWrites() throws Exception {
526         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
527
528         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
529
530         expectBatchedModificationsReady(actorRef1);
531         expectBatchedModificationsReady(actorRef2);
532
533         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
534
535         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
536         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
537
538         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
539
540         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
541
542         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
543                 actorSelection(actorRef2));
544     }
545
546     @Test
547     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
548         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
549
550         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
551
552         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
553
554         expectBatchedModificationsReady(actorRef, true);
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557
558         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559
560         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
561
562         assertTrue(ready instanceof SingleCommitCohortProxy);
563
564         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
565
566         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
567         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
568
569         verifyBatchedModifications(batchedModifications.get(0), true, true,
570                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
571     }
572
573     @Test
574     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
575         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
576         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
577
578         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
579
580         expectBatchedModificationsReady(actorRef, true);
581
582         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
583
584         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
585
586         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
587
588         assertTrue(ready instanceof SingleCommitCohortProxy);
589
590         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
591
592         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
593         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
594
595         verifyBatchedModifications(batchedModifications.get(0), false,
596                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
597
598         verifyBatchedModifications(batchedModifications.get(1), true, true);
599     }
600
601     @Test
602     public void testReadyWithReplyFailure() throws Exception {
603         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
604
605         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
606
607         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
608
609         expectFailedBatchedModifications(actorRef);
610
611         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
612
613         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
614
615         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
616
617         assertTrue(ready instanceof SingleCommitCohortProxy);
618
619         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
620     }
621
622     @Test
623     public void testReadyWithDebugContextEnabled() throws Exception {
624         dataStoreContextBuilder.transactionDebugContextEnabled(true);
625
626         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
627
628         expectBatchedModificationsReady(actorRef, true);
629
630         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
631
632         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
633
634         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
635
636         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
637
638         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
639     }
640
641     @Test
642     public void testReadyWithLocalTransaction() throws Exception {
643         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
644
645         doReturn(getSystem().actorSelection(shardActorRef.path())).
646                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
647
648         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).
649                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
650
651         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
652
653         expectReadyLocalTransaction(shardActorRef, true);
654
655         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
656         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
657
658         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
659         assertTrue(ready instanceof SingleCommitCohortProxy);
660         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
661     }
662
663     @Test
664     public void testReadyWithLocalTransactionWithFailure() throws Exception {
665         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
666
667         doReturn(getSystem().actorSelection(shardActorRef.path())).
668                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
669
670         DataTree mockDataTree = createDataTree();
671         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
672         doThrow(new RuntimeException("mock")).when(mockModification).ready();
673
674         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).
675                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
676
677         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
678
679         expectReadyLocalTransaction(shardActorRef, true);
680
681         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
682         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
683
684         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
685         assertTrue(ready instanceof SingleCommitCohortProxy);
686         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
687     }
688
689     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
690         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
691
692         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
693
694         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
695
696         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
697
698         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
699
700         transactionProxy.delete(TestModel.TEST_PATH);
701
702         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
703
704         assertTrue(ready instanceof SingleCommitCohortProxy);
705
706         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
707     }
708
709     @Test
710     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
711         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
712     }
713
714     @Test
715     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
716         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
717     }
718
719     @Test
720     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
721         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
722     }
723
724     @Test
725     public void testReadyWithInvalidReplyMessageType() throws Exception {
726         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
727         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
728
729         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
730
731         doReturn(Futures.successful(new Object())).when(mockActorContext).
732                 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class),
733                         any(Timeout.class));
734
735         expectBatchedModificationsReady(actorRef2);
736
737         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
738
739         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
740         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
741
742         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
743
744         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
745
746         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
747                 IllegalArgumentException.class);
748     }
749
750     @Test
751     public void testGetIdentifier() {
752         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
753         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
754
755         Object id = transactionProxy.getIdentifier();
756         assertNotNull("getIdentifier returned null", id);
757         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
758     }
759
760     @Test
761     public void testClose() throws Exception{
762         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
763
764         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
765                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
766
767         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
768
769         transactionProxy.read(TestModel.TEST_PATH);
770
771         transactionProxy.close();
772
773         verify(mockActorContext).sendOperationAsync(
774                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
775     }
776
777     private static interface TransactionProxyOperation {
778         void run(TransactionProxy transactionProxy);
779     }
780
781     private void throttleOperation(TransactionProxyOperation operation) {
782         throttleOperation(operation, 1, true);
783     }
784
785     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
786         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
787                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
788     }
789
790     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
791         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
792     }
793
794     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, DataTree dataTree){
795         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
796                 dataTree);
797     }
798
799
800     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
801         ActorSystem actorSystem = getSystem();
802         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
803
804         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
805         // we now allow one extra permit to be allowed for ready
806         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
807                 shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
808
809         doReturn(actorSystem.actorSelection(shardActorRef.path())).
810                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
811
812         if(shardFound) {
813             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
814                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
815             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
816                     when(mockActorContext).findPrimaryShardAsync(eq("cars"));
817
818         } else {
819             doReturn(Futures.failed(new Exception("not found")))
820                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
821         }
822
823         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
824
825         doReturn(incompleteFuture()).when(mockActorContext).
826         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
827                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
828
829         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
830
831         long start = System.nanoTime();
832
833         operation.run(transactionProxy);
834
835         long end = System.nanoTime();
836
837         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
838                 expectedCompletionTime, (end-start)),
839                 ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
840
841     }
842
843     private void completeOperation(TransactionProxyOperation operation){
844         completeOperation(operation, true);
845     }
846
847     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
848         ActorSystem actorSystem = getSystem();
849         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
850
851         doReturn(actorSystem.actorSelection(shardActorRef.path())).
852                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
853
854         if(shardFound) {
855             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
856                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
857         } else {
858             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
859                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
860         }
861
862         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
863         String actorPath = txActorRef.path().toString();
864         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, "txn-1",
865                 DataStoreVersions.CURRENT_VERSION);
866
867         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
868
869         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
870                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
871                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
872
873         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
874
875         long start = System.nanoTime();
876
877         operation.run(transactionProxy);
878
879         long end = System.nanoTime();
880
881         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
882         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
883                 expected, (end-start)), (end - start) <= expected);
884     }
885
886     private void completeOperationLocal(TransactionProxyOperation operation, DataTree dataTree){
887         ActorSystem actorSystem = getSystem();
888         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
889
890         doReturn(actorSystem.actorSelection(shardActorRef.path())).
891                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
892
893         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).
894                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
895
896         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
897
898         long start = System.nanoTime();
899
900         operation.run(transactionProxy);
901
902         long end = System.nanoTime();
903
904         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
905         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
906                 expected, (end-start)), (end - start) <= expected);
907     }
908
909     private static DataTree createDataTree(){
910         DataTree dataTree = mock(DataTree.class);
911         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
912         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
913
914         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
915         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
916
917         return dataTree;
918     }
919
920     private static DataTree createDataTree(NormalizedNode<?, ?> readResponse){
921         DataTree dataTree = mock(DataTree.class);
922         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
923         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
924
925         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
926         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
927         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
928
929         return dataTree;
930     }
931
932
933     @Test
934     public void testWriteCompletionForLocalShard(){
935         completeOperationLocal(new TransactionProxyOperation() {
936             @Override
937             public void run(TransactionProxy transactionProxy) {
938                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
939
940                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
941
942                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
943
944             }
945         }, createDataTree());
946     }
947
948     @Test
949     public void testWriteThrottlingWhenShardFound(){
950         throttleOperation(new TransactionProxyOperation() {
951             @Override
952             public void run(TransactionProxy transactionProxy) {
953                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
954
955                 expectIncompleteBatchedModifications();
956
957                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
958
959                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
960             }
961         });
962     }
963
964     @Test
965     public void testWriteThrottlingWhenShardNotFound(){
966         // Confirm that there is no throttling when the Shard is not found
967         completeOperation(new TransactionProxyOperation() {
968             @Override
969             public void run(TransactionProxy transactionProxy) {
970                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
971
972                 expectBatchedModifications(2);
973
974                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
975
976                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
977             }
978         }, false);
979
980     }
981
982
983     @Test
984     public void testWriteCompletion(){
985         completeOperation(new TransactionProxyOperation() {
986             @Override
987             public void run(TransactionProxy transactionProxy) {
988                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
989
990                 expectBatchedModifications(2);
991
992                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
993
994                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
995             }
996         });
997     }
998
999     @Test
1000     public void testMergeThrottlingWhenShardFound(){
1001         throttleOperation(new TransactionProxyOperation() {
1002             @Override
1003             public void run(TransactionProxy transactionProxy) {
1004                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1005
1006                 expectIncompleteBatchedModifications();
1007
1008                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1009
1010                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1011             }
1012         });
1013     }
1014
1015     @Test
1016     public void testMergeThrottlingWhenShardNotFound(){
1017         completeOperation(new TransactionProxyOperation() {
1018             @Override
1019             public void run(TransactionProxy transactionProxy) {
1020                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1021
1022                 expectBatchedModifications(2);
1023
1024                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1025
1026                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1027             }
1028         }, false);
1029     }
1030
1031     @Test
1032     public void testMergeCompletion(){
1033         completeOperation(new TransactionProxyOperation() {
1034             @Override
1035             public void run(TransactionProxy transactionProxy) {
1036                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1037
1038                 expectBatchedModifications(2);
1039
1040                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1041
1042                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1043             }
1044         });
1045
1046     }
1047
1048     @Test
1049     public void testMergeCompletionForLocalShard(){
1050         completeOperationLocal(new TransactionProxyOperation() {
1051             @Override
1052             public void run(TransactionProxy transactionProxy) {
1053                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1054
1055                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1056
1057                 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1058
1059             }
1060         }, createDataTree());
1061     }
1062
1063
1064     @Test
1065     public void testDeleteThrottlingWhenShardFound(){
1066
1067         throttleOperation(new TransactionProxyOperation() {
1068             @Override
1069             public void run(TransactionProxy transactionProxy) {
1070                 expectIncompleteBatchedModifications();
1071
1072                 transactionProxy.delete(TestModel.TEST_PATH);
1073
1074                 transactionProxy.delete(TestModel.TEST_PATH);
1075             }
1076         });
1077     }
1078
1079
1080     @Test
1081     public void testDeleteThrottlingWhenShardNotFound(){
1082
1083         completeOperation(new TransactionProxyOperation() {
1084             @Override
1085             public void run(TransactionProxy transactionProxy) {
1086                 expectBatchedModifications(2);
1087
1088                 transactionProxy.delete(TestModel.TEST_PATH);
1089
1090                 transactionProxy.delete(TestModel.TEST_PATH);
1091             }
1092         }, false);
1093     }
1094
1095     @Test
1096     public void testDeleteCompletionForLocalShard(){
1097         completeOperationLocal(new TransactionProxyOperation() {
1098             @Override
1099             public void run(TransactionProxy transactionProxy) {
1100
1101                 transactionProxy.delete(TestModel.TEST_PATH);
1102
1103                 transactionProxy.delete(TestModel.TEST_PATH);
1104             }
1105         }, createDataTree());
1106
1107     }
1108
1109     @Test
1110     public void testDeleteCompletion(){
1111         completeOperation(new TransactionProxyOperation() {
1112             @Override
1113             public void run(TransactionProxy transactionProxy) {
1114                 expectBatchedModifications(2);
1115
1116                 transactionProxy.delete(TestModel.TEST_PATH);
1117
1118                 transactionProxy.delete(TestModel.TEST_PATH);
1119             }
1120         });
1121
1122     }
1123
1124     @Test
1125     public void testReadThrottlingWhenShardFound(){
1126
1127         throttleOperation(new TransactionProxyOperation() {
1128             @Override
1129             public void run(TransactionProxy transactionProxy) {
1130                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1131                         any(ActorSelection.class), eqReadData());
1132
1133                 transactionProxy.read(TestModel.TEST_PATH);
1134
1135                 transactionProxy.read(TestModel.TEST_PATH);
1136             }
1137         });
1138     }
1139
1140     @Test
1141     public void testReadThrottlingWhenShardNotFound(){
1142
1143         completeOperation(new TransactionProxyOperation() {
1144             @Override
1145             public void run(TransactionProxy transactionProxy) {
1146                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1147                         any(ActorSelection.class), eqReadData());
1148
1149                 transactionProxy.read(TestModel.TEST_PATH);
1150
1151                 transactionProxy.read(TestModel.TEST_PATH);
1152             }
1153         }, false);
1154     }
1155
1156
1157     @Test
1158     public void testReadCompletion(){
1159         completeOperation(new TransactionProxyOperation() {
1160             @Override
1161             public void run(TransactionProxy transactionProxy) {
1162                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1163
1164                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1165                         any(ActorSelection.class), eqReadData(), any(Timeout.class));
1166
1167                 transactionProxy.read(TestModel.TEST_PATH);
1168
1169                 transactionProxy.read(TestModel.TEST_PATH);
1170             }
1171         });
1172
1173     }
1174
1175     @Test
1176     public void testReadCompletionForLocalShard(){
1177         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1178         completeOperationLocal(new TransactionProxyOperation() {
1179             @Override
1180             public void run(TransactionProxy transactionProxy) {
1181                 transactionProxy.read(TestModel.TEST_PATH);
1182
1183                 transactionProxy.read(TestModel.TEST_PATH);
1184             }
1185         }, createDataTree(nodeToRead));
1186
1187     }
1188
1189     @Test
1190     public void testReadCompletionForLocalShardWhenExceptionOccurs(){
1191         completeOperationLocal(new TransactionProxyOperation() {
1192             @Override
1193             public void run(TransactionProxy transactionProxy) {
1194                 transactionProxy.read(TestModel.TEST_PATH);
1195
1196                 transactionProxy.read(TestModel.TEST_PATH);
1197             }
1198         }, createDataTree());
1199
1200     }
1201
1202     @Test
1203     public void testExistsThrottlingWhenShardFound(){
1204
1205         throttleOperation(new TransactionProxyOperation() {
1206             @Override
1207             public void run(TransactionProxy transactionProxy) {
1208                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1209                         any(ActorSelection.class), eqDataExists());
1210
1211                 transactionProxy.exists(TestModel.TEST_PATH);
1212
1213                 transactionProxy.exists(TestModel.TEST_PATH);
1214             }
1215         });
1216     }
1217
1218     @Test
1219     public void testExistsThrottlingWhenShardNotFound(){
1220
1221         completeOperation(new TransactionProxyOperation() {
1222             @Override
1223             public void run(TransactionProxy transactionProxy) {
1224                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1225                         any(ActorSelection.class), eqDataExists());
1226
1227                 transactionProxy.exists(TestModel.TEST_PATH);
1228
1229                 transactionProxy.exists(TestModel.TEST_PATH);
1230             }
1231         }, false);
1232     }
1233
1234
1235     @Test
1236     public void testExistsCompletion(){
1237         completeOperation(new TransactionProxyOperation() {
1238             @Override
1239             public void run(TransactionProxy transactionProxy) {
1240                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1241                         any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1242
1243                 transactionProxy.exists(TestModel.TEST_PATH);
1244
1245                 transactionProxy.exists(TestModel.TEST_PATH);
1246             }
1247         });
1248
1249     }
1250
1251     @Test
1252     public void testExistsCompletionForLocalShard(){
1253         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1254         completeOperationLocal(new TransactionProxyOperation() {
1255             @Override
1256             public void run(TransactionProxy transactionProxy) {
1257                 transactionProxy.exists(TestModel.TEST_PATH);
1258
1259                 transactionProxy.exists(TestModel.TEST_PATH);
1260             }
1261         }, createDataTree(nodeToRead));
1262
1263     }
1264
1265     @Test
1266     public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
1267         completeOperationLocal(new TransactionProxyOperation() {
1268             @Override
1269             public void run(TransactionProxy transactionProxy) {
1270                 transactionProxy.exists(TestModel.TEST_PATH);
1271
1272                 transactionProxy.exists(TestModel.TEST_PATH);
1273             }
1274         }, createDataTree());
1275
1276     }
1277     @Test
1278     public void testReadyThrottling(){
1279
1280         throttleOperation(new TransactionProxyOperation() {
1281             @Override
1282             public void run(TransactionProxy transactionProxy) {
1283                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1284
1285                 expectBatchedModifications(1);
1286
1287                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1288
1289                 transactionProxy.ready();
1290             }
1291         });
1292     }
1293
1294     @Test
1295     public void testReadyThrottlingWithTwoTransactionContexts(){
1296         throttleOperation(new TransactionProxyOperation() {
1297             @Override
1298             public void run(TransactionProxy transactionProxy) {
1299                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1300                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1301
1302                 expectBatchedModifications(2);
1303
1304                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1305
1306                 // Trying to write to Cars will cause another transaction context to get created
1307                 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1308
1309                 // Now ready should block for both transaction contexts
1310                 transactionProxy.ready();
1311             }
1312         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
1313     }
1314
1315     private void testModificationOperationBatching(TransactionType type) throws Exception {
1316         int shardBatchedModificationCount = 3;
1317         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1318
1319         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1320
1321         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1322
1323         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1324         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1325
1326         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1327         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1328
1329         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1330         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1331
1332         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1333         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1334
1335         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1336         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1337
1338         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1339         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1340
1341         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1342         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1343
1344         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1345
1346         transactionProxy.write(writePath1, writeNode1);
1347         transactionProxy.write(writePath2, writeNode2);
1348         transactionProxy.delete(deletePath1);
1349         transactionProxy.merge(mergePath1, mergeNode1);
1350         transactionProxy.merge(mergePath2, mergeNode2);
1351         transactionProxy.write(writePath3, writeNode3);
1352         transactionProxy.merge(mergePath3, mergeNode3);
1353         transactionProxy.delete(deletePath2);
1354
1355         // This sends the last batch.
1356         transactionProxy.ready();
1357
1358         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1359         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1360
1361         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1362                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1363
1364         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1365                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1366
1367         verifyBatchedModifications(batchedModifications.get(2), true, true,
1368                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1369
1370         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1371     }
1372
1373     @Test
1374     public void testReadWriteModificationOperationBatching() throws Throwable {
1375         testModificationOperationBatching(READ_WRITE);
1376     }
1377
1378     @Test
1379     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1380         testModificationOperationBatching(WRITE_ONLY);
1381     }
1382
1383     @Test
1384     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1385         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1386         testModificationOperationBatching(WRITE_ONLY);
1387     }
1388
1389     @Test
1390     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1391
1392         int shardBatchedModificationCount = 10;
1393         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1394
1395         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1396
1397         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1398
1399         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1400         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1401
1402         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1403         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1404
1405         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1406         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1407
1408         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1409         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1410
1411         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1412
1413         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1414                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1415
1416         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1417                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1418
1419         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1420                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1421
1422         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1423
1424         transactionProxy.write(writePath1, writeNode1);
1425         transactionProxy.write(writePath2, writeNode2);
1426
1427         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1428                 get(5, TimeUnit.SECONDS);
1429
1430         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1431         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1432
1433         transactionProxy.merge(mergePath1, mergeNode1);
1434         transactionProxy.merge(mergePath2, mergeNode2);
1435
1436         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1437
1438         transactionProxy.delete(deletePath);
1439
1440         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1441         assertEquals("Exists response", true, exists);
1442
1443         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1444         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1445
1446         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1447         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1448
1449         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1450                 new WriteModification(writePath2, writeNode2));
1451
1452         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1453                 new MergeModification(mergePath2, mergeNode2));
1454
1455         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1456
1457         InOrder inOrder = Mockito.inOrder(mockActorContext);
1458         inOrder.verify(mockActorContext).executeOperationAsync(
1459                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1460
1461         inOrder.verify(mockActorContext).executeOperationAsync(
1462                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1463
1464         inOrder.verify(mockActorContext).executeOperationAsync(
1465                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1466
1467         inOrder.verify(mockActorContext).executeOperationAsync(
1468                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1469
1470         inOrder.verify(mockActorContext).executeOperationAsync(
1471                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1472
1473         inOrder.verify(mockActorContext).executeOperationAsync(
1474                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1475     }
1476
1477     @Test
1478     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1479
1480         SchemaContext schemaContext = SchemaContextHelper.full();
1481         Configuration configuration = mock(Configuration.class);
1482         doReturn(configuration).when(mockActorContext).getConfiguration();
1483         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1484         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1485
1486         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1487         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1488
1489         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1490         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1491
1492         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1493
1494         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1495
1496         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1497
1498         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1499                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1500
1501         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1502
1503         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1504
1505         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1506
1507         @SuppressWarnings("unchecked")
1508         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1509
1510         for(NormalizedNode<?,?> node : collection){
1511             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1512         }
1513
1514         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1515                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1516
1517         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1518
1519         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1520                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1521
1522         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1523     }
1524
1525
1526     private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1527         ActorSystem actorSystem = getSystem();
1528         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1529
1530         doReturn(getSystem().actorSelection(shardActorRef.path())).
1531                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1532
1533         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1534                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1535
1536         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1537
1538         doReturn(actorSystem.actorSelection(txActorRef.path())).
1539                 when(mockActorContext).actorSelection(txActorRef.path().toString());
1540
1541         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1542                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1543                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1544
1545         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1546                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()), any(Timeout.class));
1547     }
1548 }