Merge "Bug 2588: Fix read transaction failures"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.isA;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.util.concurrent.CheckedFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.mockito.InOrder;
33 import org.mockito.Mockito;
34 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
39 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
40 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
41 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
43 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
46 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
47 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
48 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.Promise;
59 import scala.concurrent.duration.Duration;
60
61 @SuppressWarnings("resource")
62 public class TransactionProxyTest extends AbstractTransactionProxyTest {
63
64     @SuppressWarnings("serial")
65     static class TestException extends RuntimeException {
66     }
67
68     static interface Invoker {
69         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
70     }
71
72     @Test
73     public void testRead() throws Exception {
74         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
75
76         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
77
78         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
79                 eq(actorSelection(actorRef)), eqSerializedReadData());
80
81         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
82                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
83
84         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
85
86         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
87
88         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
89                 eq(actorSelection(actorRef)), eqSerializedReadData());
90
91         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
92
93         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
94
95         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
96     }
97
98     @Test(expected = ReadFailedException.class)
99     public void testReadWithInvalidReplyMessageType() throws Exception {
100         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
101
102         doReturn(Futures.successful(new Object())).when(mockActorContext).
103                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
104
105         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
106
107         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
108     }
109
110     @Test(expected = TestException.class)
111     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
112         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
113
114         doReturn(Futures.failed(new TestException())).when(mockActorContext).
115                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
116
117         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
118
119         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
120     }
121
122     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
123             throws Throwable {
124         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
125
126         if (exToThrow instanceof PrimaryNotFoundException) {
127             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
128         } else {
129             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
130                     when(mockActorContext).findPrimaryShardAsync(anyString());
131         }
132
133         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
134                 any(ActorSelection.class), any());
135
136         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
137
138         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
139     }
140
141     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
142         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
143             @Override
144             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
145                 return proxy.read(TestModel.TEST_PATH);
146             }
147         });
148     }
149
150     @Test(expected = PrimaryNotFoundException.class)
151     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
152         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
153     }
154
155     @Test(expected = TimeoutException.class)
156     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
157         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
158                 new Exception("reason")));
159     }
160
161     @Test(expected = TestException.class)
162     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
163         testReadWithExceptionOnInitialCreateTransaction(new TestException());
164     }
165
166     @Test
167     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
168         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
169
170         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
171
172         expectBatchedModifications(actorRef, 1);
173
174         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
175                 eq(actorSelection(actorRef)), eqSerializedReadData());
176
177         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
178
179         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
180
181         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
182                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
183
184         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
185         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
186
187         InOrder inOrder = Mockito.inOrder(mockActorContext);
188         inOrder.verify(mockActorContext).executeOperationAsync(
189                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
190
191         inOrder.verify(mockActorContext).executeOperationAsync(
192                 eq(actorSelection(actorRef)), eqSerializedReadData());
193     }
194
195     @Test(expected=IllegalStateException.class)
196     public void testReadPreConditionCheck() {
197         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
198         transactionProxy.read(TestModel.TEST_PATH);
199     }
200
201     @Test(expected=IllegalArgumentException.class)
202     public void testInvalidCreateTransactionReply() throws Throwable {
203         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
204
205         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
206             actorSelection(actorRef.path().toString());
207
208         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
209             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
210
211         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
212             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
213
214         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
215
216         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
217     }
218
219     @Test
220     public void testExists() throws Exception {
221         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
222
223         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
224
225         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
226                 eq(actorSelection(actorRef)), eqSerializedDataExists());
227
228         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
229
230         assertEquals("Exists response", false, exists);
231
232         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
233                 eq(actorSelection(actorRef)), eqSerializedDataExists());
234
235         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
236
237         assertEquals("Exists response", true, exists);
238     }
239
240     @Test(expected = PrimaryNotFoundException.class)
241     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
242         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
243             @Override
244             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
245                 return proxy.exists(TestModel.TEST_PATH);
246             }
247         });
248     }
249
250     @Test(expected = ReadFailedException.class)
251     public void testExistsWithInvalidReplyMessageType() throws Exception {
252         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
253
254         doReturn(Futures.successful(new Object())).when(mockActorContext).
255                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
256
257         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
258                 READ_ONLY);
259
260         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
261     }
262
263     @Test(expected = TestException.class)
264     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
265         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
266
267         doReturn(Futures.failed(new TestException())).when(mockActorContext).
268                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
269
270         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
271
272         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
273     }
274
275     @Test
276     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
277         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
278
279         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
280
281         expectBatchedModifications(actorRef, 1);
282
283         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
284                 eq(actorSelection(actorRef)), eqSerializedDataExists());
285
286         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
287
288         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
289
290         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
291
292         assertEquals("Exists response", true, exists);
293
294         InOrder inOrder = Mockito.inOrder(mockActorContext);
295         inOrder.verify(mockActorContext).executeOperationAsync(
296                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
297
298         inOrder.verify(mockActorContext).executeOperationAsync(
299                 eq(actorSelection(actorRef)), eqSerializedDataExists());
300     }
301
302     @Test(expected=IllegalStateException.class)
303     public void testExistsPreConditionCheck() {
304         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
305         transactionProxy.exists(TestModel.TEST_PATH);
306     }
307
308     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
309             Class<?>... expResultTypes) throws Exception {
310         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
311
312         int i = 0;
313         for( Future<Object> future: futures) {
314             assertNotNull("Recording operation Future is null", future);
315
316             Class<?> expResultType = expResultTypes[i++];
317             if(Throwable.class.isAssignableFrom(expResultType)) {
318                 try {
319                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
320                     fail("Expected exception from recording operation Future");
321                 } catch(Exception e) {
322                     // Expected
323                 }
324             } else {
325                 assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
326                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
327             }
328         }
329     }
330
331     @Test
332     public void testWrite() throws Exception {
333         dataStoreContextBuilder.shardBatchedModificationCount(1);
334         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
335
336         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
337
338         expectBatchedModifications(actorRef, 1);
339
340         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
341
342         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
343
344         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
345     }
346
347     @Test
348     public void testWriteAfterAsyncRead() throws Throwable {
349         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
350
351         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
352         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
353                 eq(getSystem().actorSelection(actorRef.path())),
354                 eqCreateTransaction(memberName, READ_WRITE));
355
356         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
357                 eq(actorSelection(actorRef)), eqSerializedReadData());
358
359         expectBatchedModifications(actorRef, 1);
360         expectReadyTransaction(actorRef);
361
362         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
363
364         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
365
366         final CountDownLatch readComplete = new CountDownLatch(1);
367         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
368         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
369                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
370                     @Override
371                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
372                         try {
373                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
374                         } catch (Exception e) {
375                             caughtEx.set(e);
376                         } finally {
377                             readComplete.countDown();
378                         }
379                     }
380
381                     @Override
382                     public void onFailure(Throwable t) {
383                         caughtEx.set(t);
384                         readComplete.countDown();
385                     }
386                 });
387
388         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
389
390         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
391
392         if(caughtEx.get() != null) {
393             throw caughtEx.get();
394         }
395
396         // This sends the batched modification.
397         transactionProxy.ready();
398
399         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
400
401         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
402                 BatchedModificationsReply.class);
403     }
404
405     @Test(expected=IllegalStateException.class)
406     public void testWritePreConditionCheck() {
407         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
408         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
409     }
410
411     @Test(expected=IllegalStateException.class)
412     public void testWriteAfterReadyPreConditionCheck() {
413         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
414
415         transactionProxy.ready();
416
417         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
418     }
419
420     @Test
421     public void testMerge() throws Exception {
422         dataStoreContextBuilder.shardBatchedModificationCount(1);
423         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
424
425         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
426
427         expectBatchedModifications(actorRef, 1);
428
429         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
430
431         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
432
433         verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
434     }
435
436     @Test
437     public void testDelete() throws Exception {
438         dataStoreContextBuilder.shardBatchedModificationCount(1);
439         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
440
441         expectBatchedModifications(actorRef, 1);
442
443         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
444
445         transactionProxy.delete(TestModel.TEST_PATH);
446
447         verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
448     }
449
450     @Test
451     public void testReadyWithReadWrite() throws Exception {
452         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
453
454         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
455
456         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
457                 eq(actorSelection(actorRef)), eqSerializedReadData());
458
459         expectBatchedModifications(actorRef, 1);
460         expectReadyTransaction(actorRef);
461
462         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
463
464         transactionProxy.read(TestModel.TEST_PATH);
465
466         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
467
468         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
469
470         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
471
472         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
473
474         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
475                 BatchedModificationsReply.class);
476
477         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
478
479         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
480                 isA(BatchedModifications.class));
481
482         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
483                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
484     }
485
486     @Test
487     public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
488         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
489
490         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
491
492         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
493
494         expectBatchedModificationsReady(actorRef, 1);
495
496         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
497
498         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
499
500         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
501
502         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
503
504         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
505
506         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
507
508         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
509
510         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
511         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
512
513         verifyBatchedModifications(batchedModifications.get(0), true,
514                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
515
516         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
517                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
518     }
519
520     @Test
521     public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
522         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
523         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
524
525         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
526
527         expectBatchedModificationsReady(actorRef, 1);
528
529         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
530
531         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
532
533         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
534
535         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
536
537         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
538
539         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
540                 BatchedModificationsReply.class);
541
542         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
543
544         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
545         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
546
547         verifyBatchedModifications(batchedModifications.get(0), false,
548                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
549
550         verifyBatchedModifications(batchedModifications.get(1), true);
551
552         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
553                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
554     }
555
556     @Test
557     public void testReadyWithRecordingOperationFailure() throws Exception {
558         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
559
560         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
561
562         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
563
564         expectFailedBatchedModifications(actorRef);
565
566         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
567
568         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
569
570         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
571
572         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
573
574         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
575
576         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
577
578         verifyCohortFutures(proxy, TestException.class);
579
580         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
581     }
582
583     @Test
584     public void testReadyWithReplyFailure() throws Exception {
585         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
586
587         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
588
589         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
590
591         expectFailedBatchedModifications(actorRef);
592
593         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
594
595         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
596
597         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
598
599         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
600
601         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
602
603         verifyCohortFutures(proxy, TestException.class);
604     }
605
606     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
607         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
608
609         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
610
611         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
612
613         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
614
615         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
616
617         transactionProxy.delete(TestModel.TEST_PATH);
618
619         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
620
621         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
622
623         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
624
625         verifyCohortFutures(proxy, toThrow.getClass());
626     }
627
628     @Test
629     public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
630         testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
631     }
632
633     @Test
634     public void testWriteOnlyTxWithNotInitializedException() throws Exception {
635         testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
636     }
637
638     @Test
639     public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
640         testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
641     }
642
643     @Test
644     public void testReadyWithInvalidReplyMessageType() throws Exception {
645         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
646         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
647
648         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
649
650         //expectBatchedModifications(actorRef, 1);
651
652         doReturn(Futures.successful(new Object())).when(mockActorContext).
653                 executeOperationAsync(eq(actorSelection(actorRef)),
654                         isA(BatchedModifications.class));
655
656         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
657
658         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
659
660         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
661
662         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
663
664         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
665
666         verifyCohortFutures(proxy, IllegalArgumentException.class);
667     }
668
669     @Test
670     public void testGetIdentifier() {
671         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
672         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
673                 TransactionProxy.TransactionType.READ_ONLY);
674
675         Object id = transactionProxy.getIdentifier();
676         assertNotNull("getIdentifier returned null", id);
677         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
678     }
679
680     @Test
681     public void testClose() throws Exception{
682         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
683
684         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
685                 eq(actorSelection(actorRef)), eqSerializedReadData());
686
687         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
688
689         transactionProxy.read(TestModel.TEST_PATH);
690
691         transactionProxy.close();
692
693         verify(mockActorContext).sendOperationAsync(
694                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
695     }
696
697
698     /**
699      * Method to test a local Tx actor. The Tx paths are matched to decide if the
700      * Tx actor is local or not. This is done by mocking the Tx actor path
701      * and the caller paths and ensuring that the paths have the remote-address format
702      *
703      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
704      * the paths returned for the actors for all the tests are not qualified remote paths.
705      * Hence are treated as non-local/remote actors. In short, all tests except
706      * few below run for remote actors
707      *
708      * @throws Exception
709      */
710     @Test
711     public void testLocalTxActorRead() throws Exception {
712         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
713         doReturn(true).when(mockActorContext).isPathLocal(anyString());
714
715         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
716
717         // negative test case with null as the reply
718         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
719             any(ActorSelection.class), eqReadData());
720
721         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
722             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
723
724         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
725
726         // test case with node as read data reply
727         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
728
729         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
730             any(ActorSelection.class), eqReadData());
731
732         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
733
734         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
735
736         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
737
738         // test for local data exists
739         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
740             any(ActorSelection.class), eqDataExists());
741
742         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
743
744         assertEquals("Exists response", true, exists);
745     }
746
747     @Test
748     public void testLocalTxActorReady() throws Exception {
749         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
750         doReturn(true).when(mockActorContext).isPathLocal(anyString());
751
752         doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
753                 any(ActorSelection.class), isA(BatchedModifications.class));
754
755         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
756
757         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
758         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
759
760         // testing ready
761         doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
762             eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
763
764         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
765
766         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
767
768         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
769
770         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
771     }
772
773     private static interface TransactionProxyOperation {
774         void run(TransactionProxy transactionProxy);
775     }
776
777     private void throttleOperation(TransactionProxyOperation operation) {
778         throttleOperation(operation, 1, true);
779     }
780
781     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
782         ActorSystem actorSystem = getSystem();
783         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
784
785         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
786
787         doReturn(actorSystem.actorSelection(shardActorRef.path())).
788                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
789
790         if(shardFound) {
791             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
792                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
793         } else {
794             doReturn(Futures.failed(new Exception("not found")))
795                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
796         }
797
798         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
799         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
800                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
801                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
802
803         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
804                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
805                         eqCreateTransaction(memberName, READ_WRITE));
806
807         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
808
809         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
810
811         long start = System.nanoTime();
812
813         operation.run(transactionProxy);
814
815         long end = System.nanoTime();
816
817         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
818         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
819                 expected, (end-start)), (end - start) > expected);
820
821     }
822
823     private void completeOperation(TransactionProxyOperation operation){
824         completeOperation(operation, true);
825     }
826
827     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
828         ActorSystem actorSystem = getSystem();
829         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
830
831         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
832
833         doReturn(actorSystem.actorSelection(shardActorRef.path())).
834                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
835
836         if(shardFound) {
837             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
838                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
839         } else {
840             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
841                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
842         }
843
844         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
845         String actorPath = txActorRef.path().toString();
846         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
847                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
848                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
849
850         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
851
852         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
853                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
854                         eqCreateTransaction(memberName, READ_WRITE));
855
856         doReturn(true).when(mockActorContext).isPathLocal(anyString());
857
858         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
859
860         long start = System.nanoTime();
861
862         operation.run(transactionProxy);
863
864         long end = System.nanoTime();
865
866         long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
867         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
868                 expected, (end-start)), (end - start) <= expected);
869     }
870
871     public void testWriteThrottling(boolean shardFound){
872
873         throttleOperation(new TransactionProxyOperation() {
874             @Override
875             public void run(TransactionProxy transactionProxy) {
876                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
877
878                 expectBatchedModifications(2);
879
880                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
881
882                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
883             }
884         }, 1, shardFound);
885     }
886
887     @Test
888     public void testWriteThrottlingWhenShardFound(){
889         dataStoreContextBuilder.shardBatchedModificationCount(1);
890         throttleOperation(new TransactionProxyOperation() {
891             @Override
892             public void run(TransactionProxy transactionProxy) {
893                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
894
895                 expectIncompleteBatchedModifications();
896
897                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
898
899                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
900             }
901         });
902     }
903
904     @Test
905     public void testWriteThrottlingWhenShardNotFound(){
906         // Confirm that there is no throttling when the Shard is not found
907         dataStoreContextBuilder.shardBatchedModificationCount(1);
908         completeOperation(new TransactionProxyOperation() {
909             @Override
910             public void run(TransactionProxy transactionProxy) {
911                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
912
913                 expectBatchedModifications(2);
914
915                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
916
917                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
918             }
919         }, false);
920
921     }
922
923
924     @Test
925     public void testWriteCompletion(){
926         dataStoreContextBuilder.shardBatchedModificationCount(1);
927         completeOperation(new TransactionProxyOperation() {
928             @Override
929             public void run(TransactionProxy transactionProxy) {
930                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
931
932                 expectBatchedModifications(2);
933
934                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
935
936                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
937             }
938         });
939     }
940
941     @Test
942     public void testMergeThrottlingWhenShardFound(){
943         dataStoreContextBuilder.shardBatchedModificationCount(1);
944         throttleOperation(new TransactionProxyOperation() {
945             @Override
946             public void run(TransactionProxy transactionProxy) {
947                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
948
949                 expectIncompleteBatchedModifications();
950
951                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
952
953                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
954             }
955         });
956     }
957
958     @Test
959     public void testMergeThrottlingWhenShardNotFound(){
960         dataStoreContextBuilder.shardBatchedModificationCount(1);
961         completeOperation(new TransactionProxyOperation() {
962             @Override
963             public void run(TransactionProxy transactionProxy) {
964                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
965
966                 expectBatchedModifications(2);
967
968                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
969
970                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
971             }
972         }, false);
973     }
974
975     @Test
976     public void testMergeCompletion(){
977         dataStoreContextBuilder.shardBatchedModificationCount(1);
978         completeOperation(new TransactionProxyOperation() {
979             @Override
980             public void run(TransactionProxy transactionProxy) {
981                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
982
983                 expectBatchedModifications(2);
984
985                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
986
987                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
988             }
989         });
990
991     }
992
993     @Test
994     public void testDeleteThrottlingWhenShardFound(){
995
996         throttleOperation(new TransactionProxyOperation() {
997             @Override
998             public void run(TransactionProxy transactionProxy) {
999                 expectIncompleteBatchedModifications();
1000
1001                 transactionProxy.delete(TestModel.TEST_PATH);
1002
1003                 transactionProxy.delete(TestModel.TEST_PATH);
1004             }
1005         });
1006     }
1007
1008
1009     @Test
1010     public void testDeleteThrottlingWhenShardNotFound(){
1011
1012         completeOperation(new TransactionProxyOperation() {
1013             @Override
1014             public void run(TransactionProxy transactionProxy) {
1015                 expectBatchedModifications(2);
1016
1017                 transactionProxy.delete(TestModel.TEST_PATH);
1018
1019                 transactionProxy.delete(TestModel.TEST_PATH);
1020             }
1021         }, false);
1022     }
1023
1024     @Test
1025     public void testDeleteCompletion(){
1026         dataStoreContextBuilder.shardBatchedModificationCount(1);
1027         completeOperation(new TransactionProxyOperation() {
1028             @Override
1029             public void run(TransactionProxy transactionProxy) {
1030                 expectBatchedModifications(2);
1031
1032                 transactionProxy.delete(TestModel.TEST_PATH);
1033
1034                 transactionProxy.delete(TestModel.TEST_PATH);
1035             }
1036         });
1037
1038     }
1039
1040     @Test
1041     public void testReadThrottlingWhenShardFound(){
1042
1043         throttleOperation(new TransactionProxyOperation() {
1044             @Override
1045             public void run(TransactionProxy transactionProxy) {
1046                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1047                         any(ActorSelection.class), eqReadData());
1048
1049                 transactionProxy.read(TestModel.TEST_PATH);
1050
1051                 transactionProxy.read(TestModel.TEST_PATH);
1052             }
1053         });
1054     }
1055
1056     @Test
1057     public void testReadThrottlingWhenShardNotFound(){
1058
1059         completeOperation(new TransactionProxyOperation() {
1060             @Override
1061             public void run(TransactionProxy transactionProxy) {
1062                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1063                         any(ActorSelection.class), eqReadData());
1064
1065                 transactionProxy.read(TestModel.TEST_PATH);
1066
1067                 transactionProxy.read(TestModel.TEST_PATH);
1068             }
1069         }, false);
1070     }
1071
1072
1073     @Test
1074     public void testReadCompletion(){
1075         completeOperation(new TransactionProxyOperation() {
1076             @Override
1077             public void run(TransactionProxy transactionProxy) {
1078                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1079
1080                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1081                         any(ActorSelection.class), eqReadData());
1082
1083                 transactionProxy.read(TestModel.TEST_PATH);
1084
1085                 transactionProxy.read(TestModel.TEST_PATH);
1086             }
1087         });
1088
1089     }
1090
1091     @Test
1092     public void testExistsThrottlingWhenShardFound(){
1093
1094         throttleOperation(new TransactionProxyOperation() {
1095             @Override
1096             public void run(TransactionProxy transactionProxy) {
1097                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1098                         any(ActorSelection.class), eqDataExists());
1099
1100                 transactionProxy.exists(TestModel.TEST_PATH);
1101
1102                 transactionProxy.exists(TestModel.TEST_PATH);
1103             }
1104         });
1105     }
1106
1107     @Test
1108     public void testExistsThrottlingWhenShardNotFound(){
1109
1110         completeOperation(new TransactionProxyOperation() {
1111             @Override
1112             public void run(TransactionProxy transactionProxy) {
1113                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1114                         any(ActorSelection.class), eqDataExists());
1115
1116                 transactionProxy.exists(TestModel.TEST_PATH);
1117
1118                 transactionProxy.exists(TestModel.TEST_PATH);
1119             }
1120         }, false);
1121     }
1122
1123
1124     @Test
1125     public void testExistsCompletion(){
1126         completeOperation(new TransactionProxyOperation() {
1127             @Override
1128             public void run(TransactionProxy transactionProxy) {
1129                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1130                         any(ActorSelection.class), eqDataExists());
1131
1132                 transactionProxy.exists(TestModel.TEST_PATH);
1133
1134                 transactionProxy.exists(TestModel.TEST_PATH);
1135             }
1136         });
1137
1138     }
1139
1140     @Test
1141     public void testReadyThrottling(){
1142
1143         throttleOperation(new TransactionProxyOperation() {
1144             @Override
1145             public void run(TransactionProxy transactionProxy) {
1146                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1147
1148                 expectBatchedModifications(1);
1149
1150                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1151                         any(ActorSelection.class), any(ReadyTransaction.class));
1152
1153                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1154
1155                 transactionProxy.ready();
1156             }
1157         });
1158     }
1159
1160     @Test
1161     public void testReadyThrottlingWithTwoTransactionContexts(){
1162
1163         throttleOperation(new TransactionProxyOperation() {
1164             @Override
1165             public void run(TransactionProxy transactionProxy) {
1166                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1167                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1168
1169                 expectBatchedModifications(2);
1170
1171                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1172                         any(ActorSelection.class), any(ReadyTransaction.class));
1173
1174                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1175
1176                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1177
1178                 transactionProxy.ready();
1179             }
1180         }, 2, true);
1181     }
1182
1183     private void testModificationOperationBatching(TransactionType type) throws Exception {
1184         int shardBatchedModificationCount = 3;
1185         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1186
1187         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1188
1189         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1190
1191         expectReadyTransaction(actorRef);
1192
1193         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1194         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1195
1196         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1197         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1198
1199         YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1200         NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1201
1202         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1203         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1204
1205         YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1206         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1207
1208         YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1209         NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1210
1211         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1212         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1213
1214         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1215
1216         transactionProxy.write(writePath1, writeNode1);
1217         transactionProxy.write(writePath2, writeNode2);
1218         transactionProxy.delete(deletePath1);
1219         transactionProxy.merge(mergePath1, mergeNode1);
1220         transactionProxy.merge(mergePath2, mergeNode2);
1221         transactionProxy.write(writePath3, writeNode3);
1222         transactionProxy.merge(mergePath3, mergeNode3);
1223         transactionProxy.delete(deletePath2);
1224
1225         // This sends the last batch.
1226         transactionProxy.ready();
1227
1228         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1229         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1230
1231         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1232                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1233
1234         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1235                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1236
1237         boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
1238         verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
1239                 new DeleteModification(deletePath2));
1240
1241         if(optimizedWriteOnly) {
1242             verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1243                     BatchedModificationsReply.class, BatchedModificationsReply.class);
1244         } else {
1245             verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1246                     BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1247         }
1248     }
1249
1250     @Test
1251     public void testReadWriteModificationOperationBatching() throws Throwable {
1252         testModificationOperationBatching(READ_WRITE);
1253     }
1254
1255     @Test
1256     public void testWriteOnlyModificationOperationBatching() throws Throwable {
1257         testModificationOperationBatching(WRITE_ONLY);
1258     }
1259
1260     @Test
1261     public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1262         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1263         testModificationOperationBatching(WRITE_ONLY);
1264     }
1265
1266     @Test
1267     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1268
1269         int shardBatchedModificationCount = 10;
1270         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1271
1272         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1273
1274         expectBatchedModifications(actorRef, shardBatchedModificationCount);
1275
1276         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1277         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1278
1279         YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1280         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1281
1282         YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1283         NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1284
1285         YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1286         NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1287
1288         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1289
1290         doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1291                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1292
1293         doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1294                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1295
1296         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1297                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1298
1299         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1300
1301         transactionProxy.write(writePath1, writeNode1);
1302         transactionProxy.write(writePath2, writeNode2);
1303
1304         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1305                 get(5, TimeUnit.SECONDS);
1306
1307         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1308         assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1309
1310         transactionProxy.merge(mergePath1, mergeNode1);
1311         transactionProxy.merge(mergePath2, mergeNode2);
1312
1313         readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1314
1315         transactionProxy.delete(deletePath);
1316
1317         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1318         assertEquals("Exists response", true, exists);
1319
1320         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1321         assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1322
1323         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1324         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1325
1326         verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1327                 new WriteModification(writePath2, writeNode2));
1328
1329         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1330                 new MergeModification(mergePath2, mergeNode2));
1331
1332         verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1333
1334         InOrder inOrder = Mockito.inOrder(mockActorContext);
1335         inOrder.verify(mockActorContext).executeOperationAsync(
1336                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1337
1338         inOrder.verify(mockActorContext).executeOperationAsync(
1339                 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1340
1341         inOrder.verify(mockActorContext).executeOperationAsync(
1342                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1343
1344         inOrder.verify(mockActorContext).executeOperationAsync(
1345                 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1346
1347         inOrder.verify(mockActorContext).executeOperationAsync(
1348                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1349
1350         inOrder.verify(mockActorContext).executeOperationAsync(
1351                 eq(actorSelection(actorRef)), eqSerializedDataExists());
1352
1353         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1354                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1355     }
1356 }