da47b222dfa083daa42746ae3ba77c7d81c12364
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.Sets;
35 import com.google.common.util.concurrent.CheckedFuture;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Assert;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.mockito.Mockito;
49 import org.opendaylight.controller.cluster.access.concepts.MemberName;
50 import org.opendaylight.controller.cluster.datastore.config.Configuration;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
54 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
60 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
63 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
64 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
65 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
66 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
67 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
68 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
69 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
75 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
76 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
79 import scala.concurrent.Promise;
80
81 @SuppressWarnings("resource")
82 public class TransactionProxyTest extends AbstractTransactionProxyTest {
83
84     @SuppressWarnings("serial")
85     static class TestException extends RuntimeException {
86     }
87
88     interface Invoker {
89         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
90     }
91
92     @Test
93     public void testRead() throws Exception {
94         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
95
96         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
97
98         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
99                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
100
101         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
102                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
103
104         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
105
106         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
107
108         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
109                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
110
111         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
112
113         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
114
115         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
116     }
117
118     @Test(expected = ReadFailedException.class)
119     public void testReadWithInvalidReplyMessageType() throws Exception {
120         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
121
122         doReturn(Futures.successful(new Object())).when(mockActorContext)
123                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
124
125         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
126
127         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
128     }
129
130     @Test(expected = TestException.class)
131     public void testReadWithAsyncRemoteOperatonFailure() throws Exception {
132         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
133
134         doReturn(Futures.failed(new TestException())).when(mockActorContext)
135                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
136
137         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
138
139         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
140     }
141
142     private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
143             throws Exception {
144         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
145
146         if (exToThrow instanceof PrimaryNotFoundException) {
147             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
148         } else {
149             doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
150                     .findPrimaryShardAsync(anyString());
151         }
152
153         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
154                 any(ActorSelection.class), any(), any(Timeout.class));
155
156         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
157
158         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
159     }
160
161     private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Exception {
162         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
163     }
164
165     @Test(expected = PrimaryNotFoundException.class)
166     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
167         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
168     }
169
170     @Test(expected = TimeoutException.class)
171     public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
172         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
173                 new Exception("reason")));
174     }
175
176     @Test(expected = TestException.class)
177     public void testReadWhenAnyOtherExceptionIsThrown() throws Exception {
178         testReadWithExceptionOnInitialCreateTransaction(new TestException());
179     }
180
181     @Test
182     public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
183         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
184
185         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
186
187         expectBatchedModifications(actorRef, 1);
188
189         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
190                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
191
192         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
193
194         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
195
196         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
197                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
198
199         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
200         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
201
202         InOrder inOrder = Mockito.inOrder(mockActorContext);
203         inOrder.verify(mockActorContext).executeOperationAsync(
204                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
205
206         inOrder.verify(mockActorContext).executeOperationAsync(
207                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
208     }
209
210     @Test(expected = IllegalStateException.class)
211     public void testReadPreConditionCheck() {
212         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
213         transactionProxy.read(TestModel.TEST_PATH);
214     }
215
216     @Test(expected = IllegalArgumentException.class)
217     public void testInvalidCreateTransactionReply() throws Exception {
218         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
219
220         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
221                 .actorSelection(actorRef.path().toString());
222
223         doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
224                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
225
226         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
227             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
228             any(Timeout.class));
229
230         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
231
232         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
233     }
234
235     @Test
236     public void testExists() throws Exception {
237         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
238
239         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
240
241         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
242                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
243
244         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
245
246         assertEquals("Exists response", false, exists);
247
248         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
249                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
250
251         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
252
253         assertEquals("Exists response", true, exists);
254     }
255
256     @Test(expected = PrimaryNotFoundException.class)
257     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
258         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
259             proxy -> proxy.exists(TestModel.TEST_PATH));
260     }
261
262     @Test(expected = ReadFailedException.class)
263     public void testExistsWithInvalidReplyMessageType() throws Exception {
264         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
265
266         doReturn(Futures.successful(new Object())).when(mockActorContext)
267                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
268
269         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
270
271         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
272     }
273
274     @Test(expected = TestException.class)
275     public void testExistsWithAsyncRemoteOperatonFailure() throws Exception {
276         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
277
278         doReturn(Futures.failed(new TestException())).when(mockActorContext)
279                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
280
281         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
282
283         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
284     }
285
286     @Test
287     public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
288         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
289
290         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
291
292         expectBatchedModifications(actorRef, 1);
293
294         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
295                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
296
297         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
298
299         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
300
301         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
302
303         assertEquals("Exists response", true, exists);
304
305         InOrder inOrder = Mockito.inOrder(mockActorContext);
306         inOrder.verify(mockActorContext).executeOperationAsync(
307                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
308
309         inOrder.verify(mockActorContext).executeOperationAsync(
310                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
311     }
312
313     @Test(expected = IllegalStateException.class)
314     public void testExistsPreConditionCheck() {
315         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
316         transactionProxy.exists(TestModel.TEST_PATH);
317     }
318
319     @Test
320     public void testWrite() throws Exception {
321         dataStoreContextBuilder.shardBatchedModificationCount(1);
322         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
323
324         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
325
326         expectBatchedModifications(actorRef, 1);
327
328         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
329
330         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
331
332         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
333     }
334
335     @Test
336     @SuppressWarnings("checkstyle:IllegalCatch")
337     public void testWriteAfterAsyncRead() throws Exception {
338         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
339                 DefaultShardStrategy.DEFAULT_SHARD);
340
341         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
342         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
343                 eq(getSystem().actorSelection(actorRef.path())),
344                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
345
346         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
347                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
348
349         expectBatchedModificationsReady(actorRef);
350
351         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
352
353         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
354
355         final CountDownLatch readComplete = new CountDownLatch(1);
356         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
357         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
358                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
359                     @Override
360                     public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
361                         try {
362                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
363                         } catch (Exception e) {
364                             caughtEx.set(e);
365                         } finally {
366                             readComplete.countDown();
367                         }
368                     }
369
370                     @Override
371                     public void onFailure(final Throwable failure) {
372                         caughtEx.set(failure);
373                         readComplete.countDown();
374                     }
375                 }, MoreExecutors.directExecutor());
376
377         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
378
379         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
380
381         final Throwable t = caughtEx.get();
382         if (t != null) {
383             Throwables.propagateIfPossible(t, Exception.class);
384             throw new RuntimeException(t);
385         }
386
387         // This sends the batched modification.
388         transactionProxy.ready();
389
390         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
391     }
392
393     @Test(expected = IllegalStateException.class)
394     public void testWritePreConditionCheck() {
395         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
396         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
397     }
398
399     @Test(expected = IllegalStateException.class)
400     public void testWriteAfterReadyPreConditionCheck() {
401         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
402
403         transactionProxy.ready();
404
405         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
406     }
407
408     @Test
409     public void testMerge() throws Exception {
410         dataStoreContextBuilder.shardBatchedModificationCount(1);
411         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
412
413         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
414
415         expectBatchedModifications(actorRef, 1);
416
417         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
418
419         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
420
421         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
422     }
423
424     @Test
425     public void testDelete() throws Exception {
426         dataStoreContextBuilder.shardBatchedModificationCount(1);
427         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
428
429         expectBatchedModifications(actorRef, 1);
430
431         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
432
433         transactionProxy.delete(TestModel.TEST_PATH);
434
435         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
436     }
437
438     @Test
439     public void testReadWrite() throws Exception {
440         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
441
442         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
443
444         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
445                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
446
447         expectBatchedModifications(actorRef, 1);
448
449         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
450
451         transactionProxy.read(TestModel.TEST_PATH);
452
453         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
454
455         transactionProxy.read(TestModel.TEST_PATH);
456
457         transactionProxy.read(TestModel.TEST_PATH);
458
459         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
460         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
461
462         verifyBatchedModifications(batchedModifications.get(0), false,
463                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
464     }
465
466     @Test
467     public void testReadyWithReadWrite() throws Exception {
468         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
469
470         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
471
472         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
473                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
474
475         expectBatchedModificationsReady(actorRef, true);
476
477         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
478
479         transactionProxy.read(TestModel.TEST_PATH);
480
481         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
482
483         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
484
485         assertTrue(ready instanceof SingleCommitCohortProxy);
486
487         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
488
489         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
490         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
491
492         verifyBatchedModifications(batchedModifications.get(0), true, true,
493                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
494
495         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
496     }
497
498     @Test
499     public void testReadyWithNoModifications() throws Exception {
500         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
501
502         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
503                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
504
505         expectBatchedModificationsReady(actorRef, true);
506
507         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
508
509         transactionProxy.read(TestModel.TEST_PATH);
510
511         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
512
513         assertTrue(ready instanceof SingleCommitCohortProxy);
514
515         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
516
517         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
518         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
519
520         verifyBatchedModifications(batchedModifications.get(0), true, true);
521     }
522
523     @Test
524     public void testReadyWithMultipleShardWrites() throws Exception {
525         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
526
527         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
528
529         expectBatchedModificationsReady(actorRef1);
530         expectBatchedModificationsReady(actorRef2);
531
532         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
533
534         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
535         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
536
537         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
538
539         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
540
541         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
542                 actorSelection(actorRef2));
543     }
544
545     @Test
546     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
547         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
548
549         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
550
551         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
552
553         expectBatchedModificationsReady(actorRef, true);
554
555         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
556
557         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
558
559         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
560
561         assertTrue(ready instanceof SingleCommitCohortProxy);
562
563         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
564
565         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
566         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
567
568         verifyBatchedModifications(batchedModifications.get(0), true, true,
569                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
570     }
571
572     @Test
573     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
574         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
575         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
576
577         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
578
579         expectBatchedModificationsReady(actorRef, true);
580
581         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
582
583         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
584
585         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
586
587         assertTrue(ready instanceof SingleCommitCohortProxy);
588
589         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
590
591         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
592         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
593
594         verifyBatchedModifications(batchedModifications.get(0), false,
595                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
596
597         verifyBatchedModifications(batchedModifications.get(1), true, true);
598     }
599
600     @Test
601     public void testReadyWithReplyFailure() throws Exception {
602         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
603
604         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
605
606         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
607
608         expectFailedBatchedModifications(actorRef);
609
610         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
611
612         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
613
614         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
615
616         assertTrue(ready instanceof SingleCommitCohortProxy);
617
618         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
619     }
620
621     @Test
622     public void testReadyWithDebugContextEnabled() throws Exception {
623         dataStoreContextBuilder.transactionDebugContextEnabled(true);
624
625         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
626
627         expectBatchedModificationsReady(actorRef, true);
628
629         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
630
631         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
632
633         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
634
635         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
636
637         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
638     }
639
640     @Test
641     public void testReadyWithLocalTransaction() throws Exception {
642         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
643
644         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
645                 .actorSelection(shardActorRef.path().toString());
646
647         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
648                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
649
650         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
651
652         expectReadyLocalTransaction(shardActorRef, true);
653
654         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
655         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
656
657         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
658         assertTrue(ready instanceof SingleCommitCohortProxy);
659         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
660     }
661
662     @Test
663     public void testReadyWithLocalTransactionWithFailure() throws Exception {
664         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
665
666         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
667                 .actorSelection(shardActorRef.path().toString());
668
669         DataTree mockDataTree = createDataTree();
670         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
671         doThrow(new RuntimeException("mock")).when(mockModification).ready();
672
673         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
674                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
675
676         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
677
678         expectReadyLocalTransaction(shardActorRef, true);
679
680         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
681         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
682
683         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
684         assertTrue(ready instanceof SingleCommitCohortProxy);
685         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
686     }
687
688     private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) throws Exception {
689         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
690
691         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
692
693         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
694
695         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
696
697         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
698
699         transactionProxy.delete(TestModel.TEST_PATH);
700
701         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
702
703         assertTrue(ready instanceof SingleCommitCohortProxy);
704
705         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
706     }
707
708     @Test
709     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
710         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
711     }
712
713     @Test
714     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
715         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
716     }
717
718     @Test
719     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
720         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
721     }
722
723     @Test
724     public void testReadyWithInvalidReplyMessageType() throws Exception {
725         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
726         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
727
728         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
729
730         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
731                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
732
733         expectBatchedModificationsReady(actorRef2);
734
735         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
736
737         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
738         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
739
740         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
741
742         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
743
744         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
745                 IllegalArgumentException.class);
746     }
747
748     @Test
749     public void testGetIdentifier() {
750         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
751         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
752
753         Object id = transactionProxy.getIdentifier();
754         assertNotNull("getIdentifier returned null", id);
755         assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
756     }
757
758     @Test
759     public void testClose() throws Exception {
760         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
761
762         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
763                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
764
765         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
766
767         transactionProxy.read(TestModel.TEST_PATH);
768
769         transactionProxy.close();
770
771         verify(mockActorContext).sendOperationAsync(
772                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
773     }
774
775     private interface TransactionProxyOperation {
776         void run(TransactionProxy transactionProxy);
777     }
778
779     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
780         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
781     }
782
783     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
784         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
785                 dataTree);
786     }
787
788     private void throttleOperation(final TransactionProxyOperation operation) {
789         throttleOperation(operation, 1, true);
790     }
791
792     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
793             final boolean shardFound) {
794         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
795                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
796     }
797
798     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
799             final boolean shardFound, final long expectedCompletionTime) {
800         ActorSystem actorSystem = getSystem();
801         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
802
803         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
804         // we now allow one extra permit to be allowed for ready
805         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
806                 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
807                         .getDatastoreContext();
808
809         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
810                 .actorSelection(shardActorRef.path().toString());
811
812         if (shardFound) {
813             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
814                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
815             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
816                     .findPrimaryShardAsync(eq("cars"));
817
818         } else {
819             doReturn(Futures.failed(new Exception("not found")))
820                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
821         }
822
823         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
824                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
825                 any(Timeout.class));
826
827         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
828
829         long start = System.nanoTime();
830
831         operation.run(transactionProxy);
832
833         long end = System.nanoTime();
834
835         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
836                 expectedCompletionTime, end - start),
837                 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
838
839     }
840
841     private void completeOperation(final TransactionProxyOperation operation) {
842         completeOperation(operation, true);
843     }
844
845     private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
846         ActorSystem actorSystem = getSystem();
847         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
848
849         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
850                 .actorSelection(shardActorRef.path().toString());
851
852         if (shardFound) {
853             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
854                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
855         } else {
856             doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
857                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
858         }
859
860         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
861         String actorPath = txActorRef.path().toString();
862         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
863                 DataStoreVersions.CURRENT_VERSION);
864
865         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
866
867         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
868                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
869                 any(Timeout.class));
870
871         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
872
873         long start = System.nanoTime();
874
875         operation.run(transactionProxy);
876
877         long end = System.nanoTime();
878
879         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
880                 .getOperationTimeoutInMillis());
881         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
882                 expected, end - start), end - start <= expected);
883     }
884
885     private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
886         ActorSystem actorSystem = getSystem();
887         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
888
889         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
890                 .actorSelection(shardActorRef.path().toString());
891
892         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
893                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
894
895         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
896
897         long start = System.nanoTime();
898
899         operation.run(transactionProxy);
900
901         long end = System.nanoTime();
902
903         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
904                 .getOperationTimeoutInMillis());
905         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
906                 end - start <= expected);
907     }
908
909     private static DataTree createDataTree() {
910         DataTree dataTree = mock(DataTree.class);
911         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
912         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
913
914         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
915         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
916
917         return dataTree;
918     }
919
920     private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
921         DataTree dataTree = mock(DataTree.class);
922         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
923         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
924
925         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
926         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
927         doReturn(java.util.Optional.of(readResponse)).when(dataTreeModification).readNode(
928             any(YangInstanceIdentifier.class));
929
930         return dataTree;
931     }
932
933
934     @Test
935     public void testWriteCompletionForLocalShard() {
936         completeOperationLocal(transactionProxy -> {
937             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
938
939             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
940
941             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
942
943         }, createDataTree());
944     }
945
946     @Test
947     public void testWriteThrottlingWhenShardFound() {
948         throttleOperation(transactionProxy -> {
949             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
950
951             expectIncompleteBatchedModifications();
952
953             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
954
955             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
956         });
957     }
958
959     @Test
960     public void testWriteThrottlingWhenShardNotFound() {
961         // Confirm that there is no throttling when the Shard is not found
962         completeOperation(transactionProxy -> {
963             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
964
965             expectBatchedModifications(2);
966
967             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
968
969             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
970         }, false);
971
972     }
973
974
975     @Test
976     public void testWriteCompletion() {
977         completeOperation(transactionProxy -> {
978             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
979
980             expectBatchedModifications(2);
981
982             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
983
984             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
985         });
986     }
987
988     @Test
989     public void testMergeThrottlingWhenShardFound() {
990         throttleOperation(transactionProxy -> {
991             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
992
993             expectIncompleteBatchedModifications();
994
995             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
996
997             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
998         });
999     }
1000
1001     @Test
1002     public void testMergeThrottlingWhenShardNotFound() {
1003         completeOperation(transactionProxy -> {
1004             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1005
1006             expectBatchedModifications(2);
1007
1008             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1009
1010             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1011         }, false);
1012     }
1013
1014     @Test
1015     public void testMergeCompletion() {
1016         completeOperation(transactionProxy -> {
1017             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1018
1019             expectBatchedModifications(2);
1020
1021             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1022
1023             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1024         });
1025
1026     }
1027
1028     @Test
1029     public void testMergeCompletionForLocalShard() {
1030         completeOperationLocal(transactionProxy -> {
1031             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1032
1033             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1034
1035             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1036
1037         }, createDataTree());
1038     }
1039
1040
1041     @Test
1042     public void testDeleteThrottlingWhenShardFound() {
1043
1044         throttleOperation(transactionProxy -> {
1045             expectIncompleteBatchedModifications();
1046
1047             transactionProxy.delete(TestModel.TEST_PATH);
1048
1049             transactionProxy.delete(TestModel.TEST_PATH);
1050         });
1051     }
1052
1053
1054     @Test
1055     public void testDeleteThrottlingWhenShardNotFound() {
1056
1057         completeOperation(transactionProxy -> {
1058             expectBatchedModifications(2);
1059
1060             transactionProxy.delete(TestModel.TEST_PATH);
1061
1062             transactionProxy.delete(TestModel.TEST_PATH);
1063         }, false);
1064     }
1065
1066     @Test
1067     public void testDeleteCompletionForLocalShard() {
1068         completeOperationLocal(transactionProxy -> {
1069
1070             transactionProxy.delete(TestModel.TEST_PATH);
1071
1072             transactionProxy.delete(TestModel.TEST_PATH);
1073         }, createDataTree());
1074
1075     }
1076
1077     @Test
1078     public void testDeleteCompletion() {
1079         completeOperation(transactionProxy -> {
1080             expectBatchedModifications(2);
1081
1082             transactionProxy.delete(TestModel.TEST_PATH);
1083
1084             transactionProxy.delete(TestModel.TEST_PATH);
1085         });
1086
1087     }
1088
1089     @Test
1090     public void testReadThrottlingWhenShardFound() {
1091
1092         throttleOperation(transactionProxy -> {
1093             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1094                     any(ActorSelection.class), eqReadData());
1095
1096             transactionProxy.read(TestModel.TEST_PATH);
1097
1098             transactionProxy.read(TestModel.TEST_PATH);
1099         });
1100     }
1101
1102     @Test
1103     public void testReadThrottlingWhenShardNotFound() {
1104
1105         completeOperation(transactionProxy -> {
1106             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1107                     any(ActorSelection.class), eqReadData());
1108
1109             transactionProxy.read(TestModel.TEST_PATH);
1110
1111             transactionProxy.read(TestModel.TEST_PATH);
1112         }, false);
1113     }
1114
1115
1116     @Test
1117     public void testReadCompletion() {
1118         completeOperation(transactionProxy -> {
1119             NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1120
1121             doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1122                     any(ActorSelection.class), eqReadData(), any(Timeout.class));
1123
1124             transactionProxy.read(TestModel.TEST_PATH);
1125
1126             transactionProxy.read(TestModel.TEST_PATH);
1127         });
1128
1129     }
1130
1131     @Test
1132     public void testReadCompletionForLocalShard() {
1133         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1134         completeOperationLocal(transactionProxy -> {
1135             transactionProxy.read(TestModel.TEST_PATH);
1136
1137             transactionProxy.read(TestModel.TEST_PATH);
1138         }, createDataTree(nodeToRead));
1139
1140     }
1141
1142     @Test
1143     public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1144         completeOperationLocal(transactionProxy -> {
1145             transactionProxy.read(TestModel.TEST_PATH);
1146
1147             transactionProxy.read(TestModel.TEST_PATH);
1148         }, createDataTree());
1149
1150     }
1151
1152     @Test
1153     public void testExistsThrottlingWhenShardFound() {
1154
1155         throttleOperation(transactionProxy -> {
1156             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1157                     any(ActorSelection.class), eqDataExists());
1158
1159             transactionProxy.exists(TestModel.TEST_PATH);
1160
1161             transactionProxy.exists(TestModel.TEST_PATH);
1162         });
1163     }
1164
1165     @Test
1166     public void testExistsThrottlingWhenShardNotFound() {
1167
1168         completeOperation(transactionProxy -> {
1169             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1170                     any(ActorSelection.class), eqDataExists());
1171
1172             transactionProxy.exists(TestModel.TEST_PATH);
1173
1174             transactionProxy.exists(TestModel.TEST_PATH);
1175         }, false);
1176     }
1177
1178
1179     @Test
1180     public void testExistsCompletion() {
1181         completeOperation(transactionProxy -> {
1182             doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1183                     any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1184
1185             transactionProxy.exists(TestModel.TEST_PATH);
1186
1187             transactionProxy.exists(TestModel.TEST_PATH);
1188         });
1189
1190     }
1191
1192     @Test
1193     public void testExistsCompletionForLocalShard() {
1194         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1195         completeOperationLocal(transactionProxy -> {
1196             transactionProxy.exists(TestModel.TEST_PATH);
1197
1198             transactionProxy.exists(TestModel.TEST_PATH);
1199         }, createDataTree(nodeToRead));
1200
1201     }
1202
1203     @Test
1204     public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1205         completeOperationLocal(transactionProxy -> {
1206             transactionProxy.exists(TestModel.TEST_PATH);
1207
1208             transactionProxy.exists(TestModel.TEST_PATH);
1209         }, createDataTree());
1210
1211     }
1212
1213     @Test
1214     public void testReadyThrottling() {
1215
1216         throttleOperation(transactionProxy -> {
1217             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1218
1219             expectBatchedModifications(1);
1220
1221             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1222
1223             transactionProxy.ready();
1224         });
1225     }
1226
1227     @Test
1228     public void testReadyThrottlingWithTwoTransactionContexts() {
1229         throttleOperation(transactionProxy -> {
1230             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1231             NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1232
1233             expectBatchedModifications(2);
1234
1235             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1236
1237             // Trying to write to Cars will cause another transaction context to get created
1238             transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1239
1240             // Now ready should block for both transaction contexts
1241             transactionProxy.ready();
1242         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1243                 .getOperationTimeoutInMillis()) * 2);
1244     }
1245
1246     private void testModificationOperationBatching(final TransactionType type) throws Exception {
1247         int shardBatchedModificationCount = 3;
1248         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1249
1250         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1251
1252         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1253
1254         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1255         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1256
1257         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1258         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1259
1260         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1261         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1262
1263         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1264         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1265
1266         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1267         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1268
1269         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1270         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1271
1272         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1273         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1274
1275         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1276
1277         transactionProxy.write(writePath1, writeNode1);
1278         transactionProxy.write(writePath2, writeNode2);
1279         transactionProxy.delete(deletePath1);
1280         transactionProxy.merge(mergePath1, mergeNode1);
1281         transactionProxy.merge(mergePath2, mergeNode2);
1282         transactionProxy.write(writePath3, writeNode3);
1283         transactionProxy.merge(mergePath3, mergeNode3);
1284         transactionProxy.delete(deletePath2);
1285
1286         // This sends the last batch.
1287         transactionProxy.ready();
1288
1289         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1290         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1291
1292         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1293                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1294
1295         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1296                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1297
1298         verifyBatchedModifications(batchedModifications.get(2), true, true,
1299                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1300
1301         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1302     }
1303
1304     @Test
1305     public void testReadWriteModificationOperationBatching() throws Exception {
1306         testModificationOperationBatching(READ_WRITE);
1307     }
1308
1309     @Test
1310     public void testWriteOnlyModificationOperationBatching() throws Exception {
1311         testModificationOperationBatching(WRITE_ONLY);
1312     }
1313
1314     @Test
1315     public void testOptimizedWriteOnlyModificationOperationBatching() throws Exception {
1316         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1317         testModificationOperationBatching(WRITE_ONLY);
1318     }
1319
1320     @Test
1321     public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1322
1323         int shardBatchedModificationCount = 10;
1324         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1325
1326         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1327
1328         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1329
1330         final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1331         final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1332
1333         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1334         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1335
1336         final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1337         final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1338
1339         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1340         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1341
1342         final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1343
1344         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1345                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1346
1347         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1348                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1349
1350         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1351                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1352
1353         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1354
1355         transactionProxy.write(writePath1, writeNode1);
1356         transactionProxy.write(writePath2, writeNode2);
1357
1358         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1359
1360         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1361         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1362
1363         transactionProxy.merge(mergePath1, mergeNode1);
1364         transactionProxy.merge(mergePath2, mergeNode2);
1365
1366         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1367
1368         transactionProxy.delete(deletePath);
1369
1370         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1371         assertEquals("Exists response", true, exists);
1372
1373         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1374         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1375
1376         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1377         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1378
1379         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1380                 new WriteModification(writePath2, writeNode2));
1381
1382         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1383                 new MergeModification(mergePath2, mergeNode2));
1384
1385         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1386
1387         InOrder inOrder = Mockito.inOrder(mockActorContext);
1388         inOrder.verify(mockActorContext).executeOperationAsync(
1389                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1390
1391         inOrder.verify(mockActorContext).executeOperationAsync(
1392                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1393
1394         inOrder.verify(mockActorContext).executeOperationAsync(
1395                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1396
1397         inOrder.verify(mockActorContext).executeOperationAsync(
1398                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1399
1400         inOrder.verify(mockActorContext).executeOperationAsync(
1401                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1402
1403         inOrder.verify(mockActorContext).executeOperationAsync(
1404                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1405     }
1406
1407     @Test
1408     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException,
1409             java.util.concurrent.TimeoutException {
1410         SchemaContext schemaContext = SchemaContextHelper.full();
1411         Configuration configuration = mock(Configuration.class);
1412         doReturn(configuration).when(mockActorContext).getConfiguration();
1413         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1414         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1415
1416         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1417         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1418
1419         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1420         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1421
1422         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1423
1424         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1425
1426         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1427
1428         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1429                 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1430
1431         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1432
1433         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1434
1435         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1436
1437         @SuppressWarnings("unchecked")
1438         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1439
1440         for (NormalizedNode<?,?> node : collection) {
1441             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1442         }
1443
1444         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1445                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1446
1447         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1448
1449         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1450                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1451
1452         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1453     }
1454
1455
1456     private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1457         ActorSystem actorSystem = getSystem();
1458         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1459
1460         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1461                 .actorSelection(shardActorRef.path().toString());
1462
1463         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1464                 .findPrimaryShardAsync(eq(shardName));
1465
1466         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1467
1468         doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1469                 .actorSelection(txActorRef.path().toString());
1470
1471         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1472                 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1473                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1474
1475         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1476                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));
1477     }
1478 }