Remove unused exceptions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Throwables;
33 import com.google.common.collect.ImmutableSortedSet;
34 import com.google.common.collect.Sets;
35 import com.google.common.util.concurrent.FluentFuture;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.Optional;
42 import java.util.SortedSet;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.datastore.config.Configuration;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
64 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
67 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
68 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
69 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
70 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
71 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
72 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
73 import org.opendaylight.mdsal.common.api.ReadFailedException;
74 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
75 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
76 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
77 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
81 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import scala.concurrent.Promise;
84
85 @SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
86 public class TransactionProxyTest extends AbstractTransactionProxyTest {
87
88     @SuppressWarnings("serial")
89     static class TestException extends RuntimeException {
90     }
91
92     interface Invoker {
93         FluentFuture<?> invoke(TransactionProxy proxy);
94     }
95
96     @Test
97     public void testRead() throws Exception {
98         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
99
100         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
101
102         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
103                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
104
105         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
106                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
107
108         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
109
110         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
111
112         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
113                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
114
115         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
116
117         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
118
119         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
120     }
121
122     @Test(expected = ReadFailedException.class)
123     public void testReadWithInvalidReplyMessageType() throws Throwable {
124         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
125
126         doReturn(Futures.successful(new Object())).when(mockActorContext)
127                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
128
129         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
130
131         try {
132             transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
133         } catch (ExecutionException e) {
134             throw e.getCause();
135         }
136     }
137
138     @Test(expected = TestException.class)
139     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
140         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
141
142         doReturn(Futures.failed(new TestException())).when(mockActorContext)
143                 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
144
145         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
146
147         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
148     }
149
150     private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
151             throws Throwable {
152         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
153
154         if (exToThrow instanceof PrimaryNotFoundException) {
155             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
156         } else {
157             doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
158                     .findPrimaryShardAsync(anyString());
159         }
160
161         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
162                 any(ActorSelection.class), any(), any(Timeout.class));
163
164         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
165
166         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
167     }
168
169     private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
170         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
171     }
172
173     @Test(expected = PrimaryNotFoundException.class)
174     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
175         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
176     }
177
178     @Test(expected = TestException.class)
179     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
180         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
181                 new TestException()));
182     }
183
184     @Test(expected = TestException.class)
185     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
186         testReadWithExceptionOnInitialCreateTransaction(new TestException());
187     }
188
189     @Test
190     public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
191         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
192
193         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
194
195         expectBatchedModifications(actorRef, 1);
196
197         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
198                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
199
200         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
201
202         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
203
204         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
205                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
206
207         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
208         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
209
210         InOrder inOrder = Mockito.inOrder(mockActorContext);
211         inOrder.verify(mockActorContext).executeOperationAsync(
212                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
213
214         inOrder.verify(mockActorContext).executeOperationAsync(
215                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
216     }
217
218     @Test(expected = IllegalStateException.class)
219     public void testReadPreConditionCheck() {
220         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
221         transactionProxy.read(TestModel.TEST_PATH);
222     }
223
224     @Test(expected = IllegalArgumentException.class)
225     public void testInvalidCreateTransactionReply() throws Throwable {
226         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
227
228         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
229                 .actorSelection(actorRef.path().toString());
230
231         doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
232                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
233
234         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
235             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
236             any(Timeout.class));
237
238         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
239
240         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
241     }
242
243     @Test
244     public void testExists() throws Exception {
245         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
246
247         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
248
249         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
250                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
251
252         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
253
254         assertEquals("Exists response", false, exists);
255
256         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
257                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
258
259         exists = transactionProxy.exists(TestModel.TEST_PATH).get();
260
261         assertEquals("Exists response", true, exists);
262     }
263
264     @Test(expected = PrimaryNotFoundException.class)
265     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
266         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
267             proxy -> proxy.exists(TestModel.TEST_PATH));
268     }
269
270     @Test(expected = ReadFailedException.class)
271     public void testExistsWithInvalidReplyMessageType() throws Throwable {
272         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
273
274         doReturn(Futures.successful(new Object())).when(mockActorContext)
275                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
276
277         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
278
279         try {
280             transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
281         } catch (ExecutionException e) {
282             throw e.getCause();
283         }
284     }
285
286     @Test(expected = TestException.class)
287     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
288         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
289
290         doReturn(Futures.failed(new TestException())).when(mockActorContext)
291                 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
292
293         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
294
295         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
296     }
297
298     @Test
299     public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
300         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
301
302         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
303
304         expectBatchedModifications(actorRef, 1);
305
306         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
307                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
308
309         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
310
311         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
312
313         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
314
315         assertEquals("Exists response", true, exists);
316
317         InOrder inOrder = Mockito.inOrder(mockActorContext);
318         inOrder.verify(mockActorContext).executeOperationAsync(
319                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
320
321         inOrder.verify(mockActorContext).executeOperationAsync(
322                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
323     }
324
325     @Test(expected = IllegalStateException.class)
326     public void testExistsPreConditionCheck() {
327         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
328         transactionProxy.exists(TestModel.TEST_PATH);
329     }
330
331     @Test
332     public void testWrite() {
333         dataStoreContextBuilder.shardBatchedModificationCount(1);
334         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
335
336         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
337
338         expectBatchedModifications(actorRef, 1);
339
340         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
341
342         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
343
344         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
345     }
346
347     @Test
348     @SuppressWarnings("checkstyle:IllegalCatch")
349     public void testWriteAfterAsyncRead() throws Exception {
350         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
351                 DefaultShardStrategy.DEFAULT_SHARD);
352
353         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
354         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
355                 eq(getSystem().actorSelection(actorRef.path())),
356                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
357
358         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
359                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
360
361         expectBatchedModificationsReady(actorRef);
362
363         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
364
365         final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
366
367         final CountDownLatch readComplete = new CountDownLatch(1);
368         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
369         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
370                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
371                     @Override
372                     public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
373                         try {
374                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
375                         } catch (Exception e) {
376                             caughtEx.set(e);
377                         } finally {
378                             readComplete.countDown();
379                         }
380                     }
381
382                     @Override
383                     public void onFailure(final Throwable failure) {
384                         caughtEx.set(failure);
385                         readComplete.countDown();
386                     }
387                 }, MoreExecutors.directExecutor());
388
389         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
390
391         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
392
393         final Throwable t = caughtEx.get();
394         if (t != null) {
395             Throwables.propagateIfPossible(t, Exception.class);
396             throw new RuntimeException(t);
397         }
398
399         // This sends the batched modification.
400         transactionProxy.ready();
401
402         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
403     }
404
405     @Test(expected = IllegalStateException.class)
406     public void testWritePreConditionCheck() {
407         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
408         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
409     }
410
411     @Test(expected = IllegalStateException.class)
412     public void testWriteAfterReadyPreConditionCheck() {
413         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
414
415         transactionProxy.ready();
416
417         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
418     }
419
420     @Test
421     public void testMerge() {
422         dataStoreContextBuilder.shardBatchedModificationCount(1);
423         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
424
425         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
426
427         expectBatchedModifications(actorRef, 1);
428
429         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
430
431         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
432
433         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
434     }
435
436     @Test
437     public void testDelete() {
438         dataStoreContextBuilder.shardBatchedModificationCount(1);
439         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
440
441         expectBatchedModifications(actorRef, 1);
442
443         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
444
445         transactionProxy.delete(TestModel.TEST_PATH);
446
447         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
448     }
449
450     @Test
451     public void testReadWrite() {
452         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
453
454         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
455
456         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
457                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
458
459         expectBatchedModifications(actorRef, 1);
460
461         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
462
463         transactionProxy.read(TestModel.TEST_PATH);
464
465         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
466
467         transactionProxy.read(TestModel.TEST_PATH);
468
469         transactionProxy.read(TestModel.TEST_PATH);
470
471         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
472         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
473
474         verifyBatchedModifications(batchedModifications.get(0), false,
475                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
476     }
477
478     @Test
479     public void testReadyWithReadWrite() {
480         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
481
482         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
483
484         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
485                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
486
487         expectBatchedModificationsReady(actorRef, true);
488
489         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
490
491         transactionProxy.read(TestModel.TEST_PATH);
492
493         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
494
495         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
496
497         assertTrue(ready instanceof SingleCommitCohortProxy);
498
499         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
500
501         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
502         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
503
504         verifyBatchedModifications(batchedModifications.get(0), true, true,
505                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
506
507         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
508     }
509
510     @Test
511     public void testReadyWithNoModifications() {
512         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
513
514         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
515                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
516
517         expectBatchedModificationsReady(actorRef, true);
518
519         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
520
521         transactionProxy.read(TestModel.TEST_PATH);
522
523         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
524
525         assertTrue(ready instanceof SingleCommitCohortProxy);
526
527         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
528
529         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
530         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
531
532         verifyBatchedModifications(batchedModifications.get(0), true, true);
533     }
534
535     @Test
536     public void testReadyWithMultipleShardWrites() {
537         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
538
539         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
540                 TestModel.JUNK_QNAME.getLocalName());
541
542         expectBatchedModificationsReady(actorRef1);
543         expectBatchedModificationsReady(actorRef2);
544
545         ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
546
547         doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
548                 .actorSelection(actorRef3.path().toString());
549
550         doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
551                 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
552
553         expectReadyLocalTransaction(actorRef3, false);
554
555         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
556
557         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
558         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
559         transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
560
561         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
562
563         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
564
565         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
566                 actorSelection(actorRef2), actorSelection(actorRef3));
567
568         SortedSet<String> expShardNames =
569                 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
570                         TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
571
572         ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
573         verify(mockActorContext).executeOperationAsync(
574                 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
575         assertEquals("Participating shards present", true,
576                 batchedMods.getValue().getParticipatingShardNames().isPresent());
577         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
578
579         batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
580         verify(mockActorContext).executeOperationAsync(
581                 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
582         assertEquals("Participating shards present", true,
583                 batchedMods.getValue().getParticipatingShardNames().isPresent());
584         assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
585
586         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
587         verify(mockActorContext).executeOperationAsync(
588                 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
589         assertEquals("Participating shards present", true,
590                 readyLocalTx.getValue().getParticipatingShardNames().isPresent());
591         assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
592     }
593
594     @Test
595     public void testReadyWithWriteOnlyAndLastBatchPending() {
596         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
597
598         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
599
600         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
601
602         expectBatchedModificationsReady(actorRef, true);
603
604         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
605
606         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
607
608         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
609
610         assertTrue(ready instanceof SingleCommitCohortProxy);
611
612         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
613
614         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
615         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
616
617         verifyBatchedModifications(batchedModifications.get(0), true, true,
618                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
619     }
620
621     @Test
622     public void testReadyWithWriteOnlyAndLastBatchEmpty() {
623         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
624         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
625
626         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
627
628         expectBatchedModificationsReady(actorRef, true);
629
630         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
631
632         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
633
634         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
635
636         assertTrue(ready instanceof SingleCommitCohortProxy);
637
638         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
639
640         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
641         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
642
643         verifyBatchedModifications(batchedModifications.get(0), false,
644                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
645
646         verifyBatchedModifications(batchedModifications.get(1), true, true);
647     }
648
649     @Test
650     public void testReadyWithReplyFailure() {
651         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
652
653         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
654
655         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
656
657         expectFailedBatchedModifications(actorRef);
658
659         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
660
661         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
662
663         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
664
665         assertTrue(ready instanceof SingleCommitCohortProxy);
666
667         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
668     }
669
670     @Test
671     public void testReadyWithDebugContextEnabled() {
672         dataStoreContextBuilder.transactionDebugContextEnabled(true);
673
674         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
675
676         expectBatchedModificationsReady(actorRef, true);
677
678         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
679
680         transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
681
682         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
683
684         assertTrue(ready instanceof DebugThreePhaseCommitCohort);
685
686         verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
687     }
688
689     @Test
690     public void testReadyWithLocalTransaction() {
691         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
692
693         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
694                 .actorSelection(shardActorRef.path().toString());
695
696         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
697                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
698
699         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
700
701         expectReadyLocalTransaction(shardActorRef, true);
702
703         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
704         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
705
706         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
707         assertTrue(ready instanceof SingleCommitCohortProxy);
708         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
709
710         ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
711         verify(mockActorContext).executeOperationAsync(
712                 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
713         assertEquals("Participating shards present", false,
714                 readyLocalTx.getValue().getParticipatingShardNames().isPresent());
715     }
716
717     @Test
718     public void testReadyWithLocalTransactionWithFailure() {
719         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
720
721         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
722                 .actorSelection(shardActorRef.path().toString());
723
724         DataTree mockDataTree = createDataTree();
725         DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
726         doThrow(new RuntimeException("mock")).when(mockModification).ready();
727
728         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
729                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
730
731         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
732
733         expectReadyLocalTransaction(shardActorRef, true);
734
735         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
736         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
737
738         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
739         assertTrue(ready instanceof SingleCommitCohortProxy);
740         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
741     }
742
743     private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
744         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
745
746         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
747
748         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
749
750         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
751
752         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
753
754         transactionProxy.delete(TestModel.TEST_PATH);
755
756         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
757
758         assertTrue(ready instanceof SingleCommitCohortProxy);
759
760         verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
761     }
762
763     @Test
764     public void testWriteOnlyTxWithPrimaryNotFoundException() {
765         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
766     }
767
768     @Test
769     public void testWriteOnlyTxWithNotInitializedException() {
770         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
771     }
772
773     @Test
774     public void testWriteOnlyTxWithNoShardLeaderException() {
775         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
776     }
777
778     @Test
779     public void testReadyWithInvalidReplyMessageType() {
780         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
781         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
782
783         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
784                 TestModel.JUNK_QNAME.getLocalName());
785
786         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
787                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
788
789         expectBatchedModificationsReady(actorRef2);
790
791         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
792
793         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
794         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
795
796         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
797
798         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
799
800         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
801                 IllegalArgumentException.class);
802     }
803
804     @Test
805     public void testGetIdentifier() {
806         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
807         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
808
809         Object id = transactionProxy.getIdentifier();
810         assertNotNull("getIdentifier returned null", id);
811         assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
812     }
813
814     @Test
815     public void testClose() {
816         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
817
818         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
819                 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
820
821         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
822
823         transactionProxy.read(TestModel.TEST_PATH);
824
825         transactionProxy.close();
826
827         verify(mockActorContext).sendOperationAsync(
828                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
829     }
830
831     private interface TransactionProxyOperation {
832         void run(TransactionProxy transactionProxy);
833     }
834
835     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
836         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
837     }
838
839     private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
840         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
841                 dataTree);
842     }
843
844     private void throttleOperation(final TransactionProxyOperation operation) {
845         throttleOperation(operation, 1, true);
846     }
847
848     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
849             final boolean shardFound) {
850         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
851                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
852     }
853
854     private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
855             final boolean shardFound, final long expectedCompletionTime) {
856         ActorSystem actorSystem = getSystem();
857         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
858
859         // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
860         // we now allow one extra permit to be allowed for ready
861         doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
862                 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
863                         .getDatastoreContext();
864
865         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
866                 .actorSelection(shardActorRef.path().toString());
867
868         if (shardFound) {
869             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
870                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
871             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
872                     .findPrimaryShardAsync(eq("cars"));
873
874         } else {
875             doReturn(Futures.failed(new Exception("not found")))
876                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
877         }
878
879         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
880                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
881                 any(Timeout.class));
882
883         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
884
885         long start = System.nanoTime();
886
887         operation.run(transactionProxy);
888
889         long end = System.nanoTime();
890
891         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
892                 expectedCompletionTime, end - start),
893                 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
894
895     }
896
897     private void completeOperation(final TransactionProxyOperation operation) {
898         completeOperation(operation, true);
899     }
900
901     private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
902         ActorSystem actorSystem = getSystem();
903         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
904
905         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
906                 .actorSelection(shardActorRef.path().toString());
907
908         if (shardFound) {
909             doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
910                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
911         } else {
912             doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
913                     .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
914         }
915
916         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
917         String actorPath = txActorRef.path().toString();
918         CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
919                 DataStoreVersions.CURRENT_VERSION);
920
921         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
922
923         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
924                 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
925                 any(Timeout.class));
926
927         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
928
929         long start = System.nanoTime();
930
931         operation.run(transactionProxy);
932
933         long end = System.nanoTime();
934
935         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
936                 .getOperationTimeoutInMillis());
937         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
938                 expected, end - start), end - start <= expected);
939     }
940
941     private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
942         ActorSystem actorSystem = getSystem();
943         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
944
945         doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
946                 .actorSelection(shardActorRef.path().toString());
947
948         doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
949                 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
950
951         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
952
953         long start = System.nanoTime();
954
955         operation.run(transactionProxy);
956
957         long end = System.nanoTime();
958
959         long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
960                 .getOperationTimeoutInMillis());
961         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
962                 end - start <= expected);
963     }
964
965     private static DataTree createDataTree() {
966         DataTree dataTree = mock(DataTree.class);
967         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
968         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
969
970         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
971         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
972
973         return dataTree;
974     }
975
976     private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
977         DataTree dataTree = mock(DataTree.class);
978         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
979         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
980
981         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
982         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
983         doReturn(java.util.Optional.of(readResponse)).when(dataTreeModification).readNode(
984             any(YangInstanceIdentifier.class));
985
986         return dataTree;
987     }
988
989
990     @Test
991     public void testWriteCompletionForLocalShard() {
992         completeOperationLocal(transactionProxy -> {
993             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
994
995             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
996
997             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
998
999         }, createDataTree());
1000     }
1001
1002     @Test
1003     public void testWriteThrottlingWhenShardFound() {
1004         throttleOperation(transactionProxy -> {
1005             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1006
1007             expectIncompleteBatchedModifications();
1008
1009             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1010
1011             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1012         });
1013     }
1014
1015     @Test
1016     public void testWriteThrottlingWhenShardNotFound() {
1017         // Confirm that there is no throttling when the Shard is not found
1018         completeOperation(transactionProxy -> {
1019             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1020
1021             expectBatchedModifications(2);
1022
1023             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1024
1025             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1026         }, false);
1027
1028     }
1029
1030
1031     @Test
1032     public void testWriteCompletion() {
1033         completeOperation(transactionProxy -> {
1034             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1035
1036             expectBatchedModifications(2);
1037
1038             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1039
1040             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1041         });
1042     }
1043
1044     @Test
1045     public void testMergeThrottlingWhenShardFound() {
1046         throttleOperation(transactionProxy -> {
1047             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1048
1049             expectIncompleteBatchedModifications();
1050
1051             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1052
1053             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1054         });
1055     }
1056
1057     @Test
1058     public void testMergeThrottlingWhenShardNotFound() {
1059         completeOperation(transactionProxy -> {
1060             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1061
1062             expectBatchedModifications(2);
1063
1064             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1065
1066             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1067         }, false);
1068     }
1069
1070     @Test
1071     public void testMergeCompletion() {
1072         completeOperation(transactionProxy -> {
1073             NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1074
1075             expectBatchedModifications(2);
1076
1077             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1078
1079             transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1080         });
1081
1082     }
1083
1084     @Test
1085     public void testMergeCompletionForLocalShard() {
1086         completeOperationLocal(transactionProxy -> {
1087             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1088
1089             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1090
1091             transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1092
1093         }, createDataTree());
1094     }
1095
1096
1097     @Test
1098     public void testDeleteThrottlingWhenShardFound() {
1099
1100         throttleOperation(transactionProxy -> {
1101             expectIncompleteBatchedModifications();
1102
1103             transactionProxy.delete(TestModel.TEST_PATH);
1104
1105             transactionProxy.delete(TestModel.TEST_PATH);
1106         });
1107     }
1108
1109
1110     @Test
1111     public void testDeleteThrottlingWhenShardNotFound() {
1112
1113         completeOperation(transactionProxy -> {
1114             expectBatchedModifications(2);
1115
1116             transactionProxy.delete(TestModel.TEST_PATH);
1117
1118             transactionProxy.delete(TestModel.TEST_PATH);
1119         }, false);
1120     }
1121
1122     @Test
1123     public void testDeleteCompletionForLocalShard() {
1124         completeOperationLocal(transactionProxy -> {
1125
1126             transactionProxy.delete(TestModel.TEST_PATH);
1127
1128             transactionProxy.delete(TestModel.TEST_PATH);
1129         }, createDataTree());
1130
1131     }
1132
1133     @Test
1134     public void testDeleteCompletion() {
1135         completeOperation(transactionProxy -> {
1136             expectBatchedModifications(2);
1137
1138             transactionProxy.delete(TestModel.TEST_PATH);
1139
1140             transactionProxy.delete(TestModel.TEST_PATH);
1141         });
1142
1143     }
1144
1145     @Test
1146     public void testReadThrottlingWhenShardFound() {
1147
1148         throttleOperation(transactionProxy -> {
1149             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1150                     any(ActorSelection.class), eqReadData());
1151
1152             transactionProxy.read(TestModel.TEST_PATH);
1153
1154             transactionProxy.read(TestModel.TEST_PATH);
1155         });
1156     }
1157
1158     @Test
1159     public void testReadThrottlingWhenShardNotFound() {
1160
1161         completeOperation(transactionProxy -> {
1162             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1163                     any(ActorSelection.class), eqReadData());
1164
1165             transactionProxy.read(TestModel.TEST_PATH);
1166
1167             transactionProxy.read(TestModel.TEST_PATH);
1168         }, false);
1169     }
1170
1171
1172     @Test
1173     public void testReadCompletion() {
1174         completeOperation(transactionProxy -> {
1175             NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1176
1177             doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1178                     any(ActorSelection.class), eqReadData(), any(Timeout.class));
1179
1180             transactionProxy.read(TestModel.TEST_PATH);
1181
1182             transactionProxy.read(TestModel.TEST_PATH);
1183         });
1184
1185     }
1186
1187     @Test
1188     public void testReadCompletionForLocalShard() {
1189         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1190         completeOperationLocal(transactionProxy -> {
1191             transactionProxy.read(TestModel.TEST_PATH);
1192
1193             transactionProxy.read(TestModel.TEST_PATH);
1194         }, createDataTree(nodeToRead));
1195
1196     }
1197
1198     @Test
1199     public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1200         completeOperationLocal(transactionProxy -> {
1201             transactionProxy.read(TestModel.TEST_PATH);
1202
1203             transactionProxy.read(TestModel.TEST_PATH);
1204         }, createDataTree());
1205
1206     }
1207
1208     @Test
1209     public void testExistsThrottlingWhenShardFound() {
1210
1211         throttleOperation(transactionProxy -> {
1212             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1213                     any(ActorSelection.class), eqDataExists());
1214
1215             transactionProxy.exists(TestModel.TEST_PATH);
1216
1217             transactionProxy.exists(TestModel.TEST_PATH);
1218         });
1219     }
1220
1221     @Test
1222     public void testExistsThrottlingWhenShardNotFound() {
1223
1224         completeOperation(transactionProxy -> {
1225             doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1226                     any(ActorSelection.class), eqDataExists());
1227
1228             transactionProxy.exists(TestModel.TEST_PATH);
1229
1230             transactionProxy.exists(TestModel.TEST_PATH);
1231         }, false);
1232     }
1233
1234
1235     @Test
1236     public void testExistsCompletion() {
1237         completeOperation(transactionProxy -> {
1238             doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1239                     any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1240
1241             transactionProxy.exists(TestModel.TEST_PATH);
1242
1243             transactionProxy.exists(TestModel.TEST_PATH);
1244         });
1245
1246     }
1247
1248     @Test
1249     public void testExistsCompletionForLocalShard() {
1250         final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1251         completeOperationLocal(transactionProxy -> {
1252             transactionProxy.exists(TestModel.TEST_PATH);
1253
1254             transactionProxy.exists(TestModel.TEST_PATH);
1255         }, createDataTree(nodeToRead));
1256
1257     }
1258
1259     @Test
1260     public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1261         completeOperationLocal(transactionProxy -> {
1262             transactionProxy.exists(TestModel.TEST_PATH);
1263
1264             transactionProxy.exists(TestModel.TEST_PATH);
1265         }, createDataTree());
1266
1267     }
1268
1269     @Test
1270     public void testReadyThrottling() {
1271
1272         throttleOperation(transactionProxy -> {
1273             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1274
1275             expectBatchedModifications(1);
1276
1277             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1278
1279             transactionProxy.ready();
1280         });
1281     }
1282
1283     @Test
1284     public void testReadyThrottlingWithTwoTransactionContexts() {
1285         throttleOperation(transactionProxy -> {
1286             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1287             NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1288
1289             expectBatchedModifications(2);
1290
1291             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1292
1293             // Trying to write to Cars will cause another transaction context to get created
1294             transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1295
1296             // Now ready should block for both transaction contexts
1297             transactionProxy.ready();
1298         }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1299                 .getOperationTimeoutInMillis()) * 2);
1300     }
1301
1302     private void testModificationOperationBatching(final TransactionType type) {
1303         int shardBatchedModificationCount = 3;
1304         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1305
1306         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1307
1308         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1309
1310         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1311         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1312
1313         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1314         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1315
1316         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1317         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1318
1319         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1320         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1321
1322         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1323         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1324
1325         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1326         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1327
1328         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1329         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1330
1331         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1332
1333         transactionProxy.write(writePath1, writeNode1);
1334         transactionProxy.write(writePath2, writeNode2);
1335         transactionProxy.delete(deletePath1);
1336         transactionProxy.merge(mergePath1, mergeNode1);
1337         transactionProxy.merge(mergePath2, mergeNode2);
1338         transactionProxy.write(writePath3, writeNode3);
1339         transactionProxy.merge(mergePath3, mergeNode3);
1340         transactionProxy.delete(deletePath2);
1341
1342         // This sends the last batch.
1343         transactionProxy.ready();
1344
1345         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1346         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1347
1348         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1349                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1350
1351         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1352                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1353
1354         verifyBatchedModifications(batchedModifications.get(2), true, true,
1355                 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1356
1357         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1358     }
1359
1360     @Test
1361     public void testReadWriteModificationOperationBatching() {
1362         testModificationOperationBatching(READ_WRITE);
1363     }
1364
1365     @Test
1366     public void testWriteOnlyModificationOperationBatching() {
1367         testModificationOperationBatching(WRITE_ONLY);
1368     }
1369
1370     @Test
1371     public void testOptimizedWriteOnlyModificationOperationBatching() {
1372         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1373         testModificationOperationBatching(WRITE_ONLY);
1374     }
1375
1376     @Test
1377     public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1378
1379         int shardBatchedModificationCount = 10;
1380         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1381
1382         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1383
1384         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1385
1386         final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1387         final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1388
1389         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1390         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1391
1392         final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1393         final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1394
1395         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1396         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1397
1398         final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1399
1400         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1401                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1402
1403         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1404                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1405
1406         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1407                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1408
1409         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1410
1411         transactionProxy.write(writePath1, writeNode1);
1412         transactionProxy.write(writePath2, writeNode2);
1413
1414         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1415
1416         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1417         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1418
1419         transactionProxy.merge(mergePath1, mergeNode1);
1420         transactionProxy.merge(mergePath2, mergeNode2);
1421
1422         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1423
1424         transactionProxy.delete(deletePath);
1425
1426         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
1427         assertEquals("Exists response", true, exists);
1428
1429         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1430         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1431
1432         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1433         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1434
1435         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1436                 new WriteModification(writePath2, writeNode2));
1437
1438         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1439                 new MergeModification(mergePath2, mergeNode2));
1440
1441         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1442
1443         InOrder inOrder = Mockito.inOrder(mockActorContext);
1444         inOrder.verify(mockActorContext).executeOperationAsync(
1445                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1446
1447         inOrder.verify(mockActorContext).executeOperationAsync(
1448                 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1449
1450         inOrder.verify(mockActorContext).executeOperationAsync(
1451                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1452
1453         inOrder.verify(mockActorContext).executeOperationAsync(
1454                 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1455
1456         inOrder.verify(mockActorContext).executeOperationAsync(
1457                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1458
1459         inOrder.verify(mockActorContext).executeOperationAsync(
1460                 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1461     }
1462
1463     @Test
1464     public void testReadRoot() throws InterruptedException, ExecutionException,
1465             java.util.concurrent.TimeoutException {
1466         SchemaContext schemaContext = SchemaContextHelper.full();
1467         Configuration configuration = mock(Configuration.class);
1468         doReturn(configuration).when(mockActorContext).getConfiguration();
1469         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1470         doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1471
1472         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1473         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1474
1475         setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1476         setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1477
1478         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1479
1480         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1481
1482         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1483
1484         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1485                 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1486
1487         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1488
1489         NormalizedNode<?, ?> normalizedNode = readOptional.get();
1490
1491         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1492
1493         @SuppressWarnings("unchecked")
1494         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1495
1496         for (NormalizedNode<?,?> node : collection) {
1497             assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1498         }
1499
1500         assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1501                 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1502
1503         assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1504
1505         assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1506                 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1507
1508         assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1509     }
1510
1511
1512     private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1513         ActorSystem actorSystem = getSystem();
1514         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1515
1516         doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1517                 .actorSelection(shardActorRef.path().toString());
1518
1519         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1520                 .findPrimaryShardAsync(eq(shardName));
1521
1522         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1523
1524         doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1525                 .actorSelection(txActorRef.path().toString());
1526
1527         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1528                 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1529                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1530
1531         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1532                 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));
1533     }
1534 }