ca804fdeee4db8f1fb2a4b5a7ddb0ae70cd9c14f
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.Sets;
35 import com.google.common.util.concurrent.CheckedFuture;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Assert;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.mockito.Mockito;
49 import org.opendaylight.controller.cluster.access.concepts.MemberName;
50 import org.opendaylight.controller.cluster.datastore.config.Configuration;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
54 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
60 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
63 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
64 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
65 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
66 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
67 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
68 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
69 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
75 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
76 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
79 import scala.concurrent.Promise;
80
81 @SuppressWarnings("resource")
82 public class TransactionProxyTest extends AbstractTransactionProxyTest {
83
84     @SuppressWarnings("serial")
85     static class TestException extends RuntimeException {
86     }
87
88     interface Invoker {
89         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
90     }
91
92     @Test
93     public void testRead() throws Exception {
94         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
95
96         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
97
98         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
99                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
100
101         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
102                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
103
104         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
105
106         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
107
108         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
109                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
110
111         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
112
113         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
114
115         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
116     }
117
118     @Test(expected = ReadFailedException.class)
119     public void testReadWithInvalidReplyMessageType() throws Exception {
120         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
121
122         doReturn(Futures.successful(new Object())).when(mockActorContext)
123                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
124
125         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
126
127         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
128     }
129
130     @Test(expected = TestException.class)
131     public void testReadWithAsyncRemoteOperatonFailure() throws Exception {
132         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
133
134         doReturn(Futures.failed(new TestException())).when(mockActorContext)
135                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
136
137         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
138
139         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
140     }
141
142     private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
143             throws Exception {
144         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
145
146         if (exToThrow instanceof PrimaryNotFoundException) {
147             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
148         } else {
149             doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
150                     .findPrimaryShardAsync(anyString());
151         }
152
153         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
154                 any(ActorSelection.class), any(), any(Timeout.class));
155
156         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
157
158         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
159     }
160
161     private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Exception {
162         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
163     }
164
165     @Test(expected = PrimaryNotFoundException.class)
166     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
167         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
168     }
169
170     @Test(expected = TimeoutException.class)
171     public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
172         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
173                 new Exception("reason")));
174     }
175
176     @Test(expected = TestException.class)
177     public void testReadWhenAnyOtherExceptionIsThrown() throws Exception {
178         testReadWithExceptionOnInitialCreateTransaction(new TestException());
179     }
180
181     @Test
182     public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
183         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
184
185         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
186
187         expectBatchedModifications(actorRef, 1);
188
189         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
190                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
191
192         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
193
194         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
195
196         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
197                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
198
199         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
200         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
201
202         InOrder inOrder = Mockito.inOrder(mockActorContext);
203         inOrder.verify(mockActorContext).executeOperationAsync(
204                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
205
206         inOrder.verify(mockActorContext).executeOperationAsync(
207                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
208     }
209
210     @Test(expected = IllegalStateException.class)
211     public void testReadPreConditionCheck() {
212         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
213         transactionProxy.read(TestModel.TEST_PATH);
214     }
215
216     @Test(expected = IllegalArgumentException.class)
217     public void testInvalidCreateTransactionReply() throws Exception {
218         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
219
220         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
221                 .actorSelection(actorRef.path().toString());
222
223         doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
224                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
225
226         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
227             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
228             any(Timeout.class));
229
230         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
231
232         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
233     }
234
235     @Test
236     public void testExists() throws Exception {
237         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
238
239         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
240
241         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
242                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
243
244         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
245
246         assertEquals("Exists response", false, exists);
247
248         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
249                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
250
251         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
252
253         assertEquals("Exists response", true, exists);
254     }
255
256     @Test(expected = PrimaryNotFoundException.class)
257     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
258         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
259             proxy -> proxy.exists(TestModel.TEST_PATH));
260     }
261
262     @Test(expected = ReadFailedException.class)
263     public void testExistsWithInvalidReplyMessageType() throws Exception {
264         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
265
266         doReturn(Futures.successful(new Object())).when(mockActorContext)
267                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
268
269         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
270
271         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
272     }
273
274     @Test(expected = TestException.class)
275     public void testExistsWithAsyncRemoteOperatonFailure() throws Exception {
276         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
277
278         doReturn(Futures.failed(new TestException())).when(mockActorContext)
279                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
280
281         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
282
283         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
284     }
285
286     @Test
287     public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
288         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
289
290         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
291
292         expectBatchedModifications(actorRef, 1);
293
294         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
295                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
296
297         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
298
299         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
300
301         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
302
303         assertEquals("Exists response", true, exists);
304
305         InOrder inOrder = Mockito.inOrder(mockActorContext);
306         inOrder.verify(mockActorContext).executeOperationAsync(
307                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
308
309         inOrder.verify(mockActorContext).executeOperationAsync(
310                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
311     }
312
313     @Test(expected = IllegalStateException.class)
314     public void testExistsPreConditionCheck() {
315         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
316         transactionProxy.exists(TestModel.TEST_PATH);
317     }
318
319     @Test
320     public void testWrite() throws Exception {
321         dataStoreContextBuilder.shardBatchedModificationCount(1);
322         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
323
324         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
325
326         expectBatchedModifications(actorRef, 1);
327
328         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
329
330         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
331
332         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
333     }
334
335     @Test
336     @SuppressWarnings("checkstyle:IllegalCatch")
337     public void testWriteAfterAsyncRead() throws Exception {
338         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
339                 DefaultShardStrategy.DEFAULT_SHARD);
340
341         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
342         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
343                 eq(getSystem().actorSelection(actorRef.path())),
344                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
345
346         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
347                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
348
349         expectBatchedModificationsReady(actorRef);
350
351         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
352
353         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
354
355         final CountDownLatch readComplete = new CountDownLatch(1);
356         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
357         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
358                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
359                     @Override
360                     public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
361                         try {
362                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
363                         } catch (Exception e) {
364                             caughtEx.set(e);
365                         } finally {
366                             readComplete.countDown();
367                         }
368                     }
369
370                     @Override
371                     public void onFailure(final Throwable failure) {
372                         caughtEx.set(failure);
373                         readComplete.countDown();
374                     }
375                 }, MoreExecutors.directExecutor());
376
377         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
378
379         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
380
381         if (caughtEx.get() != null) {
382             Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
383             Throwables.propagate(caughtEx.get());
384         }
385
386         // This sends the batched modification.
387         transactionProxy.ready();
388
389         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
390     }
391
392     @Test(expected = IllegalStateException.class)
393     public void testWritePreConditionCheck() {
394         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
395         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
396     }
397
398     @Test(expected = IllegalStateException.class)
399     public void testWriteAfterReadyPreConditionCheck() {
400         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
401
402         transactionProxy.ready();
403
404         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
405     }
406
407     @Test
408     public void testMerge() throws Exception {
409         dataStoreContextBuilder.shardBatchedModificationCount(1);
410         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
411
412         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
413
414         expectBatchedModifications(actorRef, 1);
415
416         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
417
418         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
419
420         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
421     }
422
423     @Test
424     public void testDelete() throws Exception {
425         dataStoreContextBuilder.shardBatchedModificationCount(1);
426         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
427
428         expectBatchedModifications(actorRef, 1);
429
430         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
431
432         transactionProxy.delete(TestModel.TEST_PATH);
433
434         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
435     }
436
437     @Test
438     public void testReadWrite() throws Exception {
439         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
440
441         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
442
443         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
444                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
445
446         expectBatchedModifications(actorRef, 1);
447
448         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
449
450         transactionProxy.read(TestModel.TEST_PATH);
451
452         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
453
454         transactionProxy.read(TestModel.TEST_PATH);
455
456         transactionProxy.read(TestModel.TEST_PATH);
457
458         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
459         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
460
461         verifyBatchedModifications(batchedModifications.get(0), false,
462                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
463     }
464
465     @Test
466     public void testReadyWithReadWrite() throws Exception {
467         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
468
469         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
470
471         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
472                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
473
474         expectBatchedModificationsReady(actorRef, true);
475
476         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
477
478         transactionProxy.read(TestModel.TEST_PATH);
479
480         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
481
482         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
483
484         assertTrue(ready instanceof SingleCommitCohortProxy);
485
486         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
487
488         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
489         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
490
491         verifyBatchedModifications(batchedModifications.get(0), true, true,
492                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
493
494         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
495     }
496
497     @Test
498     public void testReadyWithNoModifications() throws Exception {
499         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
500
501         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
502                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
503
504         expectBatchedModificationsReady(actorRef, true);
505
506         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
507
508         transactionProxy.read(TestModel.TEST_PATH);
509
510         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
511
512         assertTrue(ready instanceof SingleCommitCohortProxy);
513
514         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
515
516         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
517         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
518
519         verifyBatchedModifications(batchedModifications.get(0), true, true);
520     }
521
522     @Test
523     public void testReadyWithMultipleShardWrites() throws Exception {
524         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
525
526         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
527
528         expectBatchedModificationsReady(actorRef1);
529         expectBatchedModificationsReady(actorRef2);
530
531         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
532
533         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
534         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
535
536         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
537
538         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
539
540         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
541                 actorSelection(actorRef2));
542     }
543
544     @Test
545     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
546         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
547
548         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
549
550         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
551
552         expectBatchedModificationsReady(actorRef, true);
553
554         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
555
556         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
557
558         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
559
560         assertTrue(ready instanceof SingleCommitCohortProxy);
561
562         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
563
564         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
565         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
566
567         verifyBatchedModifications(batchedModifications.get(0), true, true,
568                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
569     }
570
571     @Test
572     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
573         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
574         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
575
576         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
577
578         expectBatchedModificationsReady(actorRef, true);
579
580         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
581
582         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
583
584         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
585
586         assertTrue(ready instanceof SingleCommitCohortProxy);
587
588         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
589
590         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
591         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
592
593         verifyBatchedModifications(batchedModifications.get(0), false,
594                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
595
596         verifyBatchedModifications(batchedModifications.get(1), true, true);
597     }
598
599     @Test
600     public void testReadyWithReplyFailure() throws Exception {
601         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
602
603         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
604
605         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
606
607         expectFailedBatchedModifications(actorRef);
608
609         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
610
611         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
612
613         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
614
615         assertTrue(ready instanceof SingleCommitCohortProxy);
616
617         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
618     }
619
620     @Test
621     public void testReadyWithDebugContextEnabled() throws Exception {
622         dataStoreContextBuilder.transactionDebugContextEnabled(true);
623
624         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
625
626         expectBatchedModificationsReady(actorRef, true);
627
628         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
629
630         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
631
632         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
633
634         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
635
636         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
637     }
638
639     @Test
640     public void testReadyWithLocalTransaction() throws Exception {
641         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
642
643         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
644                 .actorSelection(shardActorRef.path().toString());
645
646         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
647                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
648
649         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
650
651         expectReadyLocalTransaction(shardActorRef, true);
652
653         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
654         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
655
656         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
657         assertTrue(ready instanceof SingleCommitCohortProxy);
658         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
659     }
660
661     @Test
662     public void testReadyWithLocalTransactionWithFailure() throws Exception {
663         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
664
665         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
666                 .actorSelection(shardActorRef.path().toString());
667
668         DataTree mockDataTree = createDataTree();
669         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
670         doThrow(new RuntimeException("mock")).when(mockModification).ready();
671
672         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
673                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
674
675         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
676
677         expectReadyLocalTransaction(shardActorRef, true);
678
679         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
680         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
681
682         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
683         assertTrue(ready instanceof SingleCommitCohortProxy);
684         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
685     }
686
687     private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) throws Exception {
688         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
689
690         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
691
692         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
693
694         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
695
696         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
697
698         transactionProxy.delete(TestModel.TEST_PATH);
699
700         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
701
702         assertTrue(ready instanceof SingleCommitCohortProxy);
703
704         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
705     }
706
707     @Test
708     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
709         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
710     }
711
712     @Test
713     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
714         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
715     }
716
717     @Test
718     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
719         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
720     }
721
722     @Test
723     public void testReadyWithInvalidReplyMessageType() throws Exception {
724         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
725         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
726
727         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
728
729         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
730                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
731
732         expectBatchedModificationsReady(actorRef2);
733
734         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
735
736         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
737         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
738
739         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
740
741         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
742
743         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
744                 IllegalArgumentException.class);
745     }
746
747     @Test
748     public void testGetIdentifier() {
749         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
750         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
751
752         Object id = transactionProxy.getIdentifier();
753         assertNotNull("getIdentifier returned null", id);
754         assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
755     }
756
757     @Test
758     public void testClose() throws Exception {
759         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
760
761         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
762                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
763
764         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
765
766         transactionProxy.read(TestModel.TEST_PATH);
767
768         transactionProxy.close();
769
770         verify(mockActorContext).sendOperationAsync(
771                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
772     }
773
774     private interface TransactionProxyOperation {
775         void run(TransactionProxy transactionProxy);
776     }
777
778     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
779         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
780     }
781
782     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
783         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
784                 dataTree);
785     }
786
787     private void throttleOperation(final TransactionProxyOperation operation) {
788         throttleOperation(operation, 1, true);
789     }
790
791     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
792             final boolean shardFound) {
793         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
794                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
795     }
796
797     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
798             final boolean shardFound, final long expectedCompletionTime) {
799         ActorSystem actorSystem = getSystem();
800         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
801
802         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
803         // we now allow one extra permit to be allowed for ready
804         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
805                 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
806                         .getDatastoreContext();
807
808         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
809                 .actorSelection(shardActorRef.path().toString());
810
811         if (shardFound) {
812             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
813                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
814             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
815                     .findPrimaryShardAsync(eq("cars"));
816
817         } else {
818             doReturn(Futures.failed(new Exception("not found")))
819                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
820         }
821
822         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
823                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
824                 any(Timeout.class));
825
826         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
827
828         long start = System.nanoTime();
829
830         operation.run(transactionProxy);
831
832         long end = System.nanoTime();
833
834         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
835                 expectedCompletionTime, end - start),
836                 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
837
838     }
839
840     private void completeOperation(final TransactionProxyOperation operation) {
841         completeOperation(operation, true);
842     }
843
844     private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
845         ActorSystem actorSystem = getSystem();
846         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
847
848         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
849                 .actorSelection(shardActorRef.path().toString());
850
851         if (shardFound) {
852             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
853                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
854         } else {
855             doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
856                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
857         }
858
859         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
860         String actorPath = txActorRef.path().toString();
861         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
862                 DataStoreVersions.CURRENT_VERSION);
863
864         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
865
866         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
867                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
868                 any(Timeout.class));
869
870         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
871
872         long start = System.nanoTime();
873
874         operation.run(transactionProxy);
875
876         long end = System.nanoTime();
877
878         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
879                 .getOperationTimeoutInMillis());
880         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
881                 expected, end - start), end - start <= expected);
882     }
883
884     private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
885         ActorSystem actorSystem = getSystem();
886         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
887
888         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
889                 .actorSelection(shardActorRef.path().toString());
890
891         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
892                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
893
894         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
895
896         long start = System.nanoTime();
897
898         operation.run(transactionProxy);
899
900         long end = System.nanoTime();
901
902         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
903                 .getOperationTimeoutInMillis());
904         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
905                 end - start <= expected);
906     }
907
908     private static DataTree createDataTree() {
909         DataTree dataTree = mock(DataTree.class);
910         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
911         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
912
913         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
914         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
915
916         return dataTree;
917     }
918
919     private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
920         DataTree dataTree = mock(DataTree.class);
921         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
922         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
923
924         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
925         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
926         doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
927
928         return dataTree;
929     }
930
931
932     @Test
933     public void testWriteCompletionForLocalShard() {
934         completeOperationLocal(transactionProxy -> {
935             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
936
937             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
938
939             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
940
941         }, createDataTree());
942     }
943
944     @Test
945     public void testWriteThrottlingWhenShardFound() {
946         throttleOperation(transactionProxy -> {
947             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
948
949             expectIncompleteBatchedModifications();
950
951             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
952
953             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
954         });
955     }
956
957     @Test
958     public void testWriteThrottlingWhenShardNotFound() {
959         // Confirm that there is no throttling when the Shard is not found
960         completeOperation(transactionProxy -> {
961             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
962
963             expectBatchedModifications(2);
964
965             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
966
967             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
968         }, false);
969
970     }
971
972
973     @Test
974     public void testWriteCompletion() {
975         completeOperation(transactionProxy -> {
976             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
977
978             expectBatchedModifications(2);
979
980             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
981
982             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
983         });
984     }
985
986     @Test
987     public void testMergeThrottlingWhenShardFound() {
988         throttleOperation(transactionProxy -> {
989             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
990
991             expectIncompleteBatchedModifications();
992
993             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
994
995             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
996         });
997     }
998
999     @Test
1000     public void testMergeThrottlingWhenShardNotFound() {
1001         completeOperation(transactionProxy -> {
1002             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1003
1004             expectBatchedModifications(2);
1005
1006             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1007
1008             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1009         }, false);
1010     }
1011
1012     @Test
1013     public void testMergeCompletion() {
1014         completeOperation(transactionProxy -> {
1015             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1016
1017             expectBatchedModifications(2);
1018
1019             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1020
1021             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1022         });
1023
1024     }
1025
1026     @Test
1027     public void testMergeCompletionForLocalShard() {
1028         completeOperationLocal(transactionProxy -> {
1029             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1030
1031             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1032
1033             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1034
1035         }, createDataTree());
1036     }
1037
1038
1039     @Test
1040     public void testDeleteThrottlingWhenShardFound() {
1041
1042         throttleOperation(transactionProxy -> {
1043             expectIncompleteBatchedModifications();
1044
1045             transactionProxy.delete(TestModel.TEST_PATH);
1046
1047             transactionProxy.delete(TestModel.TEST_PATH);
1048         });
1049     }
1050
1051
1052     @Test
1053     public void testDeleteThrottlingWhenShardNotFound() {
1054
1055         completeOperation(transactionProxy -> {
1056             expectBatchedModifications(2);
1057
1058             transactionProxy.delete(TestModel.TEST_PATH);
1059
1060             transactionProxy.delete(TestModel.TEST_PATH);
1061         }, false);
1062     }
1063
1064     @Test
1065     public void testDeleteCompletionForLocalShard() {
1066         completeOperationLocal(transactionProxy -> {
1067
1068             transactionProxy.delete(TestModel.TEST_PATH);
1069
1070             transactionProxy.delete(TestModel.TEST_PATH);
1071         }, createDataTree());
1072
1073     }
1074
1075     @Test
1076     public void testDeleteCompletion() {
1077         completeOperation(transactionProxy -> {
1078             expectBatchedModifications(2);
1079
1080             transactionProxy.delete(TestModel.TEST_PATH);
1081
1082             transactionProxy.delete(TestModel.TEST_PATH);
1083         });
1084
1085     }
1086
1087     @Test
1088     public void testReadThrottlingWhenShardFound() {
1089
1090         throttleOperation(transactionProxy -> {
1091             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1092                     any(ActorSelection.class), eqReadData());
1093
1094             transactionProxy.read(TestModel.TEST_PATH);
1095
1096             transactionProxy.read(TestModel.TEST_PATH);
1097         });
1098     }
1099
1100     @Test
1101     public void testReadThrottlingWhenShardNotFound() {
1102
1103         completeOperation(transactionProxy -> {
1104             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1105                     any(ActorSelection.class), eqReadData());
1106
1107             transactionProxy.read(TestModel.TEST_PATH);
1108
1109             transactionProxy.read(TestModel.TEST_PATH);
1110         }, false);
1111     }
1112
1113
1114     @Test
1115     public void testReadCompletion() {
1116         completeOperation(transactionProxy -> {
1117             NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1118
1119             doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1120                     any(ActorSelection.class), eqReadData(), any(Timeout.class));
1121
1122             transactionProxy.read(TestModel.TEST_PATH);
1123
1124             transactionProxy.read(TestModel.TEST_PATH);
1125         });
1126
1127     }
1128
1129     @Test
1130     public void testReadCompletionForLocalShard() {
1131         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1132         completeOperationLocal(transactionProxy -> {
1133             transactionProxy.read(TestModel.TEST_PATH);
1134
1135             transactionProxy.read(TestModel.TEST_PATH);
1136         }, createDataTree(nodeToRead));
1137
1138     }
1139
1140     @Test
1141     public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1142         completeOperationLocal(transactionProxy -> {
1143             transactionProxy.read(TestModel.TEST_PATH);
1144
1145             transactionProxy.read(TestModel.TEST_PATH);
1146         }, createDataTree());
1147
1148     }
1149
1150     @Test
1151     public void testExistsThrottlingWhenShardFound() {
1152
1153         throttleOperation(transactionProxy -> {
1154             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1155                     any(ActorSelection.class), eqDataExists());
1156
1157             transactionProxy.exists(TestModel.TEST_PATH);
1158
1159             transactionProxy.exists(TestModel.TEST_PATH);
1160         });
1161     }
1162
1163     @Test
1164     public void testExistsThrottlingWhenShardNotFound() {
1165
1166         completeOperation(transactionProxy -> {
1167             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1168                     any(ActorSelection.class), eqDataExists());
1169
1170             transactionProxy.exists(TestModel.TEST_PATH);
1171
1172             transactionProxy.exists(TestModel.TEST_PATH);
1173         }, false);
1174     }
1175
1176
1177     @Test
1178     public void testExistsCompletion() {
1179         completeOperation(transactionProxy -> {
1180             doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1181                     any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1182
1183             transactionProxy.exists(TestModel.TEST_PATH);
1184
1185             transactionProxy.exists(TestModel.TEST_PATH);
1186         });
1187
1188     }
1189
1190     @Test
1191     public void testExistsCompletionForLocalShard() {
1192         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1193         completeOperationLocal(transactionProxy -> {
1194             transactionProxy.exists(TestModel.TEST_PATH);
1195
1196             transactionProxy.exists(TestModel.TEST_PATH);
1197         }, createDataTree(nodeToRead));
1198
1199     }
1200
1201     @Test
1202     public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1203         completeOperationLocal(transactionProxy -> {
1204             transactionProxy.exists(TestModel.TEST_PATH);
1205
1206             transactionProxy.exists(TestModel.TEST_PATH);
1207         }, createDataTree());
1208
1209     }
1210
1211     @Test
1212     public void testReadyThrottling() {
1213
1214         throttleOperation(transactionProxy -> {
1215             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1216
1217             expectBatchedModifications(1);
1218
1219             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1220
1221             transactionProxy.ready();
1222         });
1223     }
1224
1225     @Test
1226     public void testReadyThrottlingWithTwoTransactionContexts() {
1227         throttleOperation(transactionProxy -> {
1228             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1229             NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1230
1231             expectBatchedModifications(2);
1232
1233             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1234
1235             // Trying to write to Cars will cause another transaction context to get created
1236             transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1237
1238             // Now ready should block for both transaction contexts
1239             transactionProxy.ready();
1240         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1241                 .getOperationTimeoutInMillis()) * 2);
1242     }
1243
1244     private void testModificationOperationBatching(final TransactionType type) throws Exception {
1245         int shardBatchedModificationCount = 3;
1246         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1247
1248         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1249
1250         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1251
1252         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1253         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1254
1255         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1256         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1257
1258         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1259         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1260
1261         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1262         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1263
1264         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1265         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1266
1267         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1268         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1269
1270         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1271         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1272
1273         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1274
1275         transactionProxy.write(writePath1, writeNode1);
1276         transactionProxy.write(writePath2, writeNode2);
1277         transactionProxy.delete(deletePath1);
1278         transactionProxy.merge(mergePath1, mergeNode1);
1279         transactionProxy.merge(mergePath2, mergeNode2);
1280         transactionProxy.write(writePath3, writeNode3);
1281         transactionProxy.merge(mergePath3, mergeNode3);
1282         transactionProxy.delete(deletePath2);
1283
1284         // This sends the last batch.
1285         transactionProxy.ready();
1286
1287         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1288         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1289
1290         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1291                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1292
1293         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1294                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1295
1296         verifyBatchedModifications(batchedModifications.get(2), true, true,
1297                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1298
1299         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1300     }
1301
1302     @Test
1303     public void testReadWriteModificationOperationBatching() throws Exception {
1304         testModificationOperationBatching(READ_WRITE);
1305     }
1306
1307     @Test
1308     public void testWriteOnlyModificationOperationBatching() throws Exception {
1309         testModificationOperationBatching(WRITE_ONLY);
1310     }
1311
1312     @Test
1313     public void testOptimizedWriteOnlyModificationOperationBatching() throws Exception {
1314         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1315         testModificationOperationBatching(WRITE_ONLY);
1316     }
1317
1318     @Test
1319     public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1320
1321         int shardBatchedModificationCount = 10;
1322         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1323
1324         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1325
1326         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1327
1328         final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1329         final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1330
1331         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1332         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1333
1334         final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1335         final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1336
1337         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1338         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1339
1340         final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1341
1342         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1343                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1344
1345         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1346                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1347
1348         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1349                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1350
1351         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1352
1353         transactionProxy.write(writePath1, writeNode1);
1354         transactionProxy.write(writePath2, writeNode2);
1355
1356         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1357
1358         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1359         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1360
1361         transactionProxy.merge(mergePath1, mergeNode1);
1362         transactionProxy.merge(mergePath2, mergeNode2);
1363
1364         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1365
1366         transactionProxy.delete(deletePath);
1367
1368         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1369         assertEquals("Exists response", true, exists);
1370
1371         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1372         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1373
1374         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1375         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1376
1377         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1378                 new WriteModification(writePath2, writeNode2));
1379
1380         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1381                 new MergeModification(mergePath2, mergeNode2));
1382
1383         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1384
1385         InOrder inOrder = Mockito.inOrder(mockActorContext);
1386         inOrder.verify(mockActorContext).executeOperationAsync(
1387                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1388
1389         inOrder.verify(mockActorContext).executeOperationAsync(
1390                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1391
1392         inOrder.verify(mockActorContext).executeOperationAsync(
1393                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1394
1395         inOrder.verify(mockActorContext).executeOperationAsync(
1396                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1397
1398         inOrder.verify(mockActorContext).executeOperationAsync(
1399                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1400
1401         inOrder.verify(mockActorContext).executeOperationAsync(
1402                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1403     }
1404
1405     @Test
1406     public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException,
1407             java.util.concurrent.TimeoutException {
1408         SchemaContext schemaContext = SchemaContextHelper.full();
1409         Configuration configuration = mock(Configuration.class);
1410         doReturn(configuration).when(mockActorContext).getConfiguration();
1411         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1412         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1413
1414         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1415         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1416
1417         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1418         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1419
1420         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1421
1422         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1423
1424         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1425
1426         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1427                 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1428
1429         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1430
1431         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1432
1433         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1434
1435         @SuppressWarnings("unchecked")
1436         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1437
1438         for (NormalizedNode<?,?> node : collection) {
1439             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1440         }
1441
1442         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1443                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1444
1445         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1446
1447         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1448                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1449
1450         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1451     }
1452
1453
1454     private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1455         ActorSystem actorSystem = getSystem();
1456         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1457
1458         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1459                 .actorSelection(shardActorRef.path().toString());
1460
1461         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1462                 .findPrimaryShardAsync(eq(shardName));
1463
1464         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1465
1466         doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1467                 .actorSelection(txActorRef.path().toString());
1468
1469         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1470                 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1471                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1472
1473         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1474                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));
1475     }
1476 }