Merge "Cleanup root pom "name"."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.argThat;
10 import static org.mockito.Matchers.eq;
11 import static org.mockito.Matchers.isA;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
18 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSelection;
21 import akka.actor.ActorSystem;
22 import akka.actor.Props;
23 import akka.dispatch.Futures;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.FutureCallback;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.Config;
31 import com.typesafe.config.ConfigFactory;
32 import java.io.IOException;
33 import java.util.List;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.junit.AfterClass;
38 import org.junit.Assert;
39 import org.junit.Before;
40 import org.junit.BeforeClass;
41 import org.junit.Test;
42 import org.mockito.ArgumentMatcher;
43 import org.mockito.Mock;
44 import org.mockito.Mockito;
45 import org.mockito.MockitoAnnotations;
46 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
47 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
49 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
52 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
53 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
54 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
55 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
56 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
62 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
63 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
64 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
65 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
66 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
67 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
68 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
69 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
70 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
71 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
72 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
74 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
75 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
76 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
77 import scala.concurrent.Await;
78 import scala.concurrent.Future;
79 import scala.concurrent.Promise;
80 import scala.concurrent.duration.Duration;
81
82 @SuppressWarnings("resource")
83 public class TransactionProxyTest {
84
85     @SuppressWarnings("serial")
86     static class TestException extends RuntimeException {
87     }
88
89     static interface Invoker {
90         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
91     }
92
93     private static ActorSystem system;
94
95     private final Configuration configuration = new MockConfiguration();
96
97     @Mock
98     private ActorContext mockActorContext;
99
100     private SchemaContext schemaContext;
101
102     @Mock
103     private ClusterWrapper mockClusterWrapper;
104
105     String memberName = "mock-member";
106
107     @BeforeClass
108     public static void setUpClass() throws IOException {
109
110         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
111                 put("akka.actor.default-dispatcher.type",
112                         "akka.testkit.CallingThreadDispatcherConfigurator").build()).
113                 withFallback(ConfigFactory.load());
114         system = ActorSystem.create("test", config);
115     }
116
117     @AfterClass
118     public static void tearDownClass() throws IOException {
119         JavaTestKit.shutdownActorSystem(system);
120         system = null;
121     }
122
123     @Before
124     public void setUp(){
125         MockitoAnnotations.initMocks(this);
126
127         schemaContext = TestModel.createTestContext();
128
129         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
130
131         doReturn(getSystem()).when(mockActorContext).getActorSystem();
132         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
133         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
134         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
135         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
136         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
137         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
138
139         ShardStrategyFactory.setConfiguration(configuration);
140     }
141
142     private ActorSystem getSystem() {
143         return system;
144     }
145
146     private CreateTransaction eqCreateTransaction(final String memberName,
147             final TransactionType type) {
148         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
149             @Override
150             public boolean matches(Object argument) {
151                 if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
152                     CreateTransaction obj = CreateTransaction.fromSerializable(argument);
153                     return obj.getTransactionId().startsWith(memberName) &&
154                             obj.getTransactionType() == type.ordinal();
155                 }
156
157                 return false;
158             }
159         };
160
161         return argThat(matcher);
162     }
163
164     private DataExists eqSerializedDataExists() {
165         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
166             @Override
167             public boolean matches(Object argument) {
168                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
169                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
170             }
171         };
172
173         return argThat(matcher);
174     }
175
176     private DataExists eqDataExists() {
177         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
178             @Override
179             public boolean matches(Object argument) {
180                 return (argument instanceof DataExists) &&
181                     ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
182             }
183         };
184
185         return argThat(matcher);
186     }
187
188     private ReadData eqSerializedReadData() {
189         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
190             @Override
191             public boolean matches(Object argument) {
192                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
193                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
194             }
195         };
196
197         return argThat(matcher);
198     }
199
200     private ReadData eqReadData() {
201         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
202             @Override
203             public boolean matches(Object argument) {
204                 return (argument instanceof ReadData) &&
205                     ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
206             }
207         };
208
209         return argThat(matcher);
210     }
211
212     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
213         return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
214     }
215
216     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
217             final int transactionVersion) {
218         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
219             @Override
220             public boolean matches(Object argument) {
221                 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
222                         WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
223                    (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
224                            ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
225
226                     WriteData obj = WriteData.fromSerializable(argument);
227                     return obj.getPath().equals(TestModel.TEST_PATH) &&
228                            obj.getData().equals(nodeToWrite);
229                 }
230
231                 return false;
232             }
233         };
234
235         return argThat(matcher);
236     }
237
238     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
239         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
240             @Override
241             public boolean matches(Object argument) {
242                 if(argument instanceof WriteData) {
243                     WriteData obj = (WriteData) argument;
244                     return obj.getPath().equals(TestModel.TEST_PATH) &&
245                         obj.getData().equals(nodeToWrite);
246                 }
247                 return false;
248             }
249         };
250
251         return argThat(matcher);
252     }
253
254     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
255         return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
256     }
257
258     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
259             final int transactionVersion) {
260         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
261             @Override
262             public boolean matches(Object argument) {
263                 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
264                         MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
265                    (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
266                            ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
267
268                     MergeData obj = MergeData.fromSerializable(argument);
269                     return obj.getPath().equals(TestModel.TEST_PATH) &&
270                            obj.getData().equals(nodeToWrite);
271                 }
272
273                 return false;
274             }
275         };
276
277         return argThat(matcher);
278     }
279
280     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
281         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
282             @Override
283             public boolean matches(Object argument) {
284                 if(argument instanceof MergeData) {
285                     MergeData obj = ((MergeData) argument);
286                     return obj.getPath().equals(TestModel.TEST_PATH) &&
287                         obj.getData().equals(nodeToWrite);
288                 }
289
290                return false;
291             }
292         };
293
294         return argThat(matcher);
295     }
296
297     private DeleteData eqSerializedDeleteData() {
298         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
299             @Override
300             public boolean matches(Object argument) {
301                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
302                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
303             }
304         };
305
306         return argThat(matcher);
307     }
308
309         private DeleteData eqDeleteData() {
310         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
311             @Override
312             public boolean matches(Object argument) {
313                 return argument instanceof DeleteData &&
314                     ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
315             }
316         };
317
318         return argThat(matcher);
319     }
320
321     private Future<Object> readySerializedTxReply(String path) {
322         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
323     }
324
325     private Future<Object> readyTxReply(String path) {
326         return Futures.successful((Object)new ReadyTransactionReply(path));
327     }
328
329     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
330             short transactionVersion) {
331         return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
332     }
333
334     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
335         return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
336     }
337
338     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
339         return Futures.successful(new ReadDataReply(data));
340     }
341
342     private Future<Object> dataExistsSerializedReply(boolean exists) {
343         return Futures.successful(new DataExistsReply(exists).toSerializable());
344     }
345
346     private Future<DataExistsReply> dataExistsReply(boolean exists) {
347         return Futures.successful(new DataExistsReply(exists));
348     }
349
350     private Future<Object> writeSerializedDataReply(short version) {
351         return Futures.successful(new WriteDataReply().toSerializable(version));
352     }
353
354     private Future<Object> writeSerializedDataReply() {
355         return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
356     }
357
358     private Future<WriteDataReply> writeDataReply() {
359         return Futures.successful(new WriteDataReply());
360     }
361
362     private Future<Object> mergeSerializedDataReply(short version) {
363         return Futures.successful(new MergeDataReply().toSerializable(version));
364     }
365
366     private Future<Object> mergeSerializedDataReply() {
367         return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
368     }
369
370     private Future<Object> incompleteFuture(){
371         return mock(Future.class);
372     }
373
374     private Future<MergeDataReply> mergeDataReply() {
375         return Futures.successful(new MergeDataReply());
376     }
377
378     private Future<Object> deleteSerializedDataReply(short version) {
379         return Futures.successful(new DeleteDataReply().toSerializable(version));
380     }
381
382     private Future<Object> deleteSerializedDataReply() {
383         return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
384     }
385
386     private Future<DeleteDataReply> deleteDataReply() {
387         return Futures.successful(new DeleteDataReply());
388     }
389
390     private ActorSelection actorSelection(ActorRef actorRef) {
391         return getSystem().actorSelection(actorRef.path());
392     }
393
394     private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
395         return CreateTransactionReply.newBuilder()
396             .setTransactionActorPath(actorRef.path().toString())
397             .setTransactionId("txn-1")
398             .setMessageVersion(transactionVersion)
399             .build();
400     }
401
402     private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
403         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
404         doReturn(actorSystem.actorSelection(actorRef.path())).
405                 when(mockActorContext).actorSelection(actorRef.path().toString());
406
407         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
408                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
409
410         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
411
412         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
413
414         return actorRef;
415     }
416
417     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
418             TransactionType type, int transactionVersion) {
419         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
420
421         doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
422                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
423                         eqCreateTransaction(memberName, type));
424
425         return actorRef;
426     }
427
428     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
429         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
430     }
431
432
433     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
434             throws Throwable {
435
436         try {
437             future.checkedGet(5, TimeUnit.SECONDS);
438             fail("Expected ReadFailedException");
439         } catch(ReadFailedException e) {
440             throw e.getCause();
441         }
442     }
443
444     @Test
445     public void testRead() throws Exception {
446         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
447
448         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
449                 READ_ONLY);
450
451         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
452                 eq(actorSelection(actorRef)), eqSerializedReadData());
453
454         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
455                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
456
457         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
458
459         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
460
461         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
462                 eq(actorSelection(actorRef)), eqSerializedReadData());
463
464         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
465
466         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
467
468         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
469     }
470
471     @Test(expected = ReadFailedException.class)
472     public void testReadWithInvalidReplyMessageType() throws Exception {
473         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
474
475         doReturn(Futures.successful(new Object())).when(mockActorContext).
476                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
477
478         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
479                 READ_ONLY);
480
481         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
482     }
483
484     @Test(expected = TestException.class)
485     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
486         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
487
488         doReturn(Futures.failed(new TestException())).when(mockActorContext).
489                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
490
491         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
492                 READ_ONLY);
493
494         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
495     }
496
497     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
498             throws Throwable {
499         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
500
501         if (exToThrow instanceof PrimaryNotFoundException) {
502             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
503         } else {
504             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
505                     when(mockActorContext).findPrimaryShardAsync(anyString());
506         }
507
508         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
509                 any(ActorSelection.class), any());
510
511         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
512
513         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
514     }
515
516     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
517         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
518             @Override
519             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
520                 return proxy.read(TestModel.TEST_PATH);
521             }
522         });
523     }
524
525     @Test(expected = PrimaryNotFoundException.class)
526     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
527         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
528     }
529
530     @Test(expected = TimeoutException.class)
531     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
532         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
533                 new Exception("reason")));
534     }
535
536     @Test(expected = TestException.class)
537     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
538         testReadWithExceptionOnInitialCreateTransaction(new TestException());
539     }
540
541     @Test(expected = TestException.class)
542     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
543         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
544
545         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
546
547         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
548                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
549
550         doReturn(Futures.failed(new TestException())).when(mockActorContext).
551                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
552
553         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
554                 eq(actorSelection(actorRef)), eqSerializedReadData());
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
557                 READ_WRITE);
558
559         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
560
561         transactionProxy.delete(TestModel.TEST_PATH);
562
563         try {
564             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
565         } finally {
566             verify(mockActorContext, times(0)).executeOperationAsync(
567                     eq(actorSelection(actorRef)), eqSerializedReadData());
568         }
569     }
570
571     @Test
572     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
573         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
574
575         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
576
577         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
578                 eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
579
580         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
581                 eq(actorSelection(actorRef)), eqSerializedReadData());
582
583         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
584                 READ_WRITE);
585
586         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
587
588         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
589                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
590
591         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
592
593         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
594     }
595
596     @Test(expected=IllegalStateException.class)
597     public void testReadPreConditionCheck() {
598
599         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
600                 WRITE_ONLY);
601
602         transactionProxy.read(TestModel.TEST_PATH);
603     }
604
605     @Test(expected=IllegalArgumentException.class)
606     public void testInvalidCreateTransactionReply() throws Throwable {
607         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
608
609         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
610             actorSelection(actorRef.path().toString());
611
612         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
613             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
614
615         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
616             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
617
618         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
619
620         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
621     }
622
623     @Test
624     public void testExists() throws Exception {
625         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
626
627         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
628                 READ_ONLY);
629
630         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
631                 eq(actorSelection(actorRef)), eqSerializedDataExists());
632
633         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
634
635         assertEquals("Exists response", false, exists);
636
637         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
638                 eq(actorSelection(actorRef)), eqSerializedDataExists());
639
640         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
641
642         assertEquals("Exists response", true, exists);
643     }
644
645     @Test(expected = PrimaryNotFoundException.class)
646     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
647         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
648             @Override
649             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
650                 return proxy.exists(TestModel.TEST_PATH);
651             }
652         });
653     }
654
655     @Test(expected = ReadFailedException.class)
656     public void testExistsWithInvalidReplyMessageType() throws Exception {
657         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
658
659         doReturn(Futures.successful(new Object())).when(mockActorContext).
660                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
661
662         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
663                 READ_ONLY);
664
665         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
666     }
667
668     @Test(expected = TestException.class)
669     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
670         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
671
672         doReturn(Futures.failed(new TestException())).when(mockActorContext).
673                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
674
675         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
676                 READ_ONLY);
677
678         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
679     }
680
681     @Test(expected = TestException.class)
682     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
683         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
684
685         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
686
687         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
688                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
689
690         doReturn(Futures.failed(new TestException())).when(mockActorContext).
691                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
692
693         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
694                 eq(actorSelection(actorRef)), eqSerializedDataExists());
695
696         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
697                 READ_WRITE);
698
699         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
700
701         transactionProxy.delete(TestModel.TEST_PATH);
702
703         try {
704             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
705         } finally {
706             verify(mockActorContext, times(0)).executeOperationAsync(
707                     eq(actorSelection(actorRef)), eqSerializedDataExists());
708         }
709     }
710
711     @Test
712     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
713         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
714
715         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
716
717         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
718                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
719
720         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
721                 eq(actorSelection(actorRef)), eqSerializedDataExists());
722
723         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
724                 READ_WRITE);
725
726         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
727
728         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
729
730         assertEquals("Exists response", true, exists);
731     }
732
733     @Test(expected=IllegalStateException.class)
734     public void testExistsPreConditionCheck() {
735
736         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
737                 WRITE_ONLY);
738
739         transactionProxy.exists(TestModel.TEST_PATH);
740     }
741
742     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
743             Class<?>... expResultTypes) throws Exception {
744         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
745
746         int i = 0;
747         for( Future<Object> future: futures) {
748             assertNotNull("Recording operation Future is null", future);
749
750             Class<?> expResultType = expResultTypes[i++];
751             if(Throwable.class.isAssignableFrom(expResultType)) {
752                 try {
753                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
754                     fail("Expected exception from recording operation Future");
755                 } catch(Exception e) {
756                     // Expected
757                 }
758             } else {
759                 assertEquals("Recording operation Future result type", expResultType,
760                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
761             }
762         }
763     }
764
765     @Test
766     public void testWrite() throws Exception {
767         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
768
769         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
770
771         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
772                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
773
774         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
775                 WRITE_ONLY);
776
777         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
778
779         verify(mockActorContext).executeOperationAsync(
780                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
781
782         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
783                 WriteDataReply.class);
784     }
785
786     @Test
787     public void testWriteAfterAsyncRead() throws Throwable {
788         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
789
790         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
791         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
792                 eq(getSystem().actorSelection(actorRef.path())),
793                 eqCreateTransaction(memberName, READ_WRITE));
794
795         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
796                 eq(actorSelection(actorRef)), eqSerializedReadData());
797
798         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
799
800         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
801                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
802
803         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
804
805         final CountDownLatch readComplete = new CountDownLatch(1);
806         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
807         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
808                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
809                     @Override
810                     public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
811                         try {
812                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
813                         } catch (Exception e) {
814                             caughtEx.set(e);
815                         } finally {
816                             readComplete.countDown();
817                         }
818                     }
819
820                     @Override
821                     public void onFailure(Throwable t) {
822                         caughtEx.set(t);
823                         readComplete.countDown();
824                     }
825                 });
826
827         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
828
829         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
830
831         if(caughtEx.get() != null) {
832             throw caughtEx.get();
833         }
834
835         verify(mockActorContext).executeOperationAsync(
836                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
837
838         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
839                 WriteDataReply.class);
840     }
841
842     @Test(expected=IllegalStateException.class)
843     public void testWritePreConditionCheck() {
844
845         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
846                 READ_ONLY);
847
848         transactionProxy.write(TestModel.TEST_PATH,
849                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
850     }
851
852     @Test(expected=IllegalStateException.class)
853     public void testWriteAfterReadyPreConditionCheck() {
854
855         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
856                 WRITE_ONLY);
857
858         transactionProxy.ready();
859
860         transactionProxy.write(TestModel.TEST_PATH,
861                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
862     }
863
864     @Test
865     public void testMerge() throws Exception {
866         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
867
868         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
869
870         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
871                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
872
873         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
874
875         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
876
877         verify(mockActorContext).executeOperationAsync(
878                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
879
880         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
881                 MergeDataReply.class);
882     }
883
884     @Test
885     public void testDelete() throws Exception {
886         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
887
888         doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
889                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
890
891         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
892                 WRITE_ONLY);
893
894         transactionProxy.delete(TestModel.TEST_PATH);
895
896         verify(mockActorContext).executeOperationAsync(
897                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
898
899         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
900                 DeleteDataReply.class);
901     }
902
903     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
904         Object... expReplies) throws Exception {
905         assertEquals("getReadyOperationFutures size", expReplies.length,
906                 proxy.getCohortFutures().size());
907
908         int i = 0;
909         for( Future<ActorSelection> future: proxy.getCohortFutures()) {
910             assertNotNull("Ready operation Future is null", future);
911
912             Object expReply = expReplies[i++];
913             if(expReply instanceof ActorSelection) {
914                 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
915                 assertEquals("Cohort actor path", expReply, actual);
916             } else {
917                 // Expecting exception.
918                 try {
919                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
920                     fail("Expected exception from ready operation Future");
921                 } catch(Exception e) {
922                     // Expected
923                 }
924             }
925         }
926     }
927
928     @Test
929     public void testReady() throws Exception {
930         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
931
932         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
933
934         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
935                 eq(actorSelection(actorRef)), eqSerializedReadData());
936
937         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
938                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
939
940         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
941                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
942
943         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
944                 READ_WRITE);
945
946         transactionProxy.read(TestModel.TEST_PATH);
947
948         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
949
950         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
951
952         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
953
954         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
955
956         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
957                 WriteDataReply.class);
958
959         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
960     }
961
962     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
963         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
964                 READ_WRITE, version);
965
966         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
967
968         doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
969                 eq(actorSelection(actorRef)), eqSerializedReadData());
970
971         doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
972                 eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
973
974         doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
975                 eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
976
977         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
978                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
979
980         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
981                 eq(actorRef.path().toString()));
982
983         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
984
985         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
986                 get(5, TimeUnit.SECONDS);
987
988         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
989         assertEquals("Response NormalizedNode", testNode, readOptional.get());
990
991         transactionProxy.write(TestModel.TEST_PATH, testNode);
992
993         transactionProxy.merge(TestModel.TEST_PATH, testNode);
994
995         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
996
997         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
998
999         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1000
1001         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1002                 ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
1003
1004         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
1005
1006         return actorRef;
1007     }
1008
1009     @Test
1010     public void testCompatibilityWithBaseHeliumVersion() throws Exception {
1011         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
1012
1013         verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
1014                 eq(actorRef.path().toString()));
1015     }
1016
1017     @Test
1018     public void testCompatibilityWithHeliumR1Version() throws Exception {
1019         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
1020
1021         verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
1022                 eq(actorRef.path().toString()));
1023     }
1024
1025     @Test
1026     public void testReadyWithRecordingOperationFailure() throws Exception {
1027         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1028
1029         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1030
1031         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1032                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
1033
1034         doReturn(Futures.failed(new TestException())).when(mockActorContext).
1035                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1036
1037         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1038                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
1039
1040         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
1041
1042         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1043                 WRITE_ONLY);
1044
1045         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1046
1047         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1048
1049         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1050
1051         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1052
1053         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1054
1055         verifyCohortFutures(proxy, TestException.class);
1056
1057         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1058                 MergeDataReply.class, TestException.class);
1059     }
1060
1061     @Test
1062     public void testReadyWithReplyFailure() throws Exception {
1063         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1064
1065         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1066
1067         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1068                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
1069
1070         doReturn(Futures.failed(new TestException())).when(mockActorContext).
1071                 executeOperationAsync(eq(actorSelection(actorRef)),
1072                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
1073
1074         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1075                 WRITE_ONLY);
1076
1077         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1078
1079         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1080
1081         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1082
1083         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1084
1085         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1086                 MergeDataReply.class);
1087
1088         verifyCohortFutures(proxy, TestException.class);
1089     }
1090
1091     @Test
1092     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
1093
1094         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
1095                 mockActorContext).findPrimaryShardAsync(anyString());
1096
1097         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1098                 WRITE_ONLY);
1099
1100         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1101
1102         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1103
1104         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1105
1106         transactionProxy.delete(TestModel.TEST_PATH);
1107
1108         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1109
1110         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1111
1112         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1113
1114         verifyCohortFutures(proxy, PrimaryNotFoundException.class);
1115     }
1116
1117     @Test
1118     public void testReadyWithInvalidReplyMessageType() throws Exception {
1119         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1120
1121         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1122
1123         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1124                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1125
1126         doReturn(Futures.successful(new Object())).when(mockActorContext).
1127                 executeOperationAsync(eq(actorSelection(actorRef)),
1128                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
1129
1130         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1131                 WRITE_ONLY);
1132
1133         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1134
1135         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1136
1137         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1138
1139         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1140
1141         verifyCohortFutures(proxy, IllegalArgumentException.class);
1142     }
1143
1144     @Test
1145     public void testGetIdentifier() {
1146         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
1147         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1148                 TransactionProxy.TransactionType.READ_ONLY);
1149
1150         Object id = transactionProxy.getIdentifier();
1151         assertNotNull("getIdentifier returned null", id);
1152         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
1153     }
1154
1155     @Test
1156     public void testClose() throws Exception{
1157         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1158
1159         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
1160                 eq(actorSelection(actorRef)), eqSerializedReadData());
1161
1162         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1163                 READ_WRITE);
1164
1165         transactionProxy.read(TestModel.TEST_PATH);
1166
1167         transactionProxy.close();
1168
1169         verify(mockActorContext).sendOperationAsync(
1170                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
1171     }
1172
1173
1174     /**
1175      * Method to test a local Tx actor. The Tx paths are matched to decide if the
1176      * Tx actor is local or not. This is done by mocking the Tx actor path
1177      * and the caller paths and ensuring that the paths have the remote-address format
1178      *
1179      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
1180      * the paths returned for the actors for all the tests are not qualified remote paths.
1181      * Hence are treated as non-local/remote actors. In short, all tests except
1182      * few below run for remote actors
1183      *
1184      * @throws Exception
1185      */
1186     @Test
1187     public void testLocalTxActorRead() throws Exception {
1188         ActorSystem actorSystem = getSystem();
1189         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1190
1191         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1192             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1193
1194         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1195             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1196
1197         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1198         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1199             .setTransactionId("txn-1")
1200             .setTransactionActorPath(actorPath)
1201             .build();
1202
1203         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1204             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1205                 eqCreateTransaction(memberName, READ_ONLY));
1206
1207         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1208
1209         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
1210
1211         // negative test case with null as the reply
1212         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
1213             any(ActorSelection.class), eqReadData());
1214
1215         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1216             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1217
1218         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
1219
1220         // test case with node as read data reply
1221         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1222
1223         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1224             any(ActorSelection.class), eqReadData());
1225
1226         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1227
1228         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1229
1230         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
1231
1232         // test for local data exists
1233         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1234             any(ActorSelection.class), eqDataExists());
1235
1236         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1237
1238         assertEquals("Exists response", true, exists);
1239     }
1240
1241     @Test
1242     public void testLocalTxActorWrite() throws Exception {
1243         ActorSystem actorSystem = getSystem();
1244         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1245
1246         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1247             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1248
1249         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1250             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1251
1252         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1253         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1254             .setTransactionId("txn-1")
1255             .setTransactionActorPath(actorPath)
1256             .build();
1257
1258         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1259         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1260                 eqCreateTransaction(memberName, WRITE_ONLY));
1261
1262         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1263
1264         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1265
1266         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1267             any(ActorSelection.class), eqWriteData(nodeToWrite));
1268
1269         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
1270         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1271
1272         verify(mockActorContext).executeOperationAsync(
1273             any(ActorSelection.class), eqWriteData(nodeToWrite));
1274
1275         //testing local merge
1276         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1277             any(ActorSelection.class), eqMergeData(nodeToWrite));
1278
1279         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1280
1281         verify(mockActorContext).executeOperationAsync(
1282             any(ActorSelection.class), eqMergeData(nodeToWrite));
1283
1284
1285         //testing local delete
1286         doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1287             any(ActorSelection.class), eqDeleteData());
1288
1289         transactionProxy.delete(TestModel.TEST_PATH);
1290
1291         verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
1292
1293         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1294             WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
1295
1296         // testing ready
1297         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1298             any(ActorSelection.class), isA(ReadyTransaction.class));
1299
1300         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1301
1302         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1303
1304         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1305
1306         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
1307     }
1308
1309     private static interface TransactionProxyOperation {
1310         void run(TransactionProxy transactionProxy);
1311     }
1312
1313     private void throttleOperation(TransactionProxyOperation operation) {
1314         throttleOperation(operation, 1, true);
1315     }
1316
1317     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
1318         ActorSystem actorSystem = getSystem();
1319         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1320
1321         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
1322
1323         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1324                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1325
1326         if(shardFound) {
1327             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1328                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1329         } else {
1330             doReturn(Futures.failed(new Exception("not found")))
1331                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1332         }
1333
1334         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1335         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1336                 .setTransactionId("txn-1")
1337                 .setTransactionActorPath(actorPath)
1338                 .build();
1339
1340         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1341                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1342                         eqCreateTransaction(memberName, READ_WRITE));
1343
1344         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1345
1346         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1347
1348         long start = System.nanoTime();
1349
1350         operation.run(transactionProxy);
1351
1352         long end = System.nanoTime();
1353
1354         Assert.assertTrue(String.format("took less time than expected %s was %s",
1355                 TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
1356                 (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
1357
1358     }
1359
1360     private void completeOperation(TransactionProxyOperation operation){
1361         completeOperation(operation, true);
1362     }
1363
1364     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
1365         ActorSystem actorSystem = getSystem();
1366         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1367
1368         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
1369
1370         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1371                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1372
1373         if(shardFound) {
1374             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1375                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1376         } else {
1377             doReturn(Futures.failed(new Exception("not found")))
1378                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1379         }
1380
1381         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1382         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1383                 .setTransactionId("txn-1")
1384                 .setTransactionActorPath(actorPath)
1385                 .build();
1386
1387         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1388                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1389                         eqCreateTransaction(memberName, READ_WRITE));
1390
1391         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1392
1393         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1394
1395         long start = System.nanoTime();
1396
1397         operation.run(transactionProxy);
1398
1399         long end = System.nanoTime();
1400
1401         Assert.assertTrue(String.format("took more time than expected %s was %s",
1402                 TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
1403                 (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
1404     }
1405
1406     public void testWriteThrottling(boolean shardFound){
1407
1408         throttleOperation(new TransactionProxyOperation() {
1409             @Override
1410             public void run(TransactionProxy transactionProxy) {
1411                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1412
1413                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1414                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1415
1416                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1417
1418                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1419             }
1420         }, 1, shardFound);
1421     }
1422
1423     @Test
1424     public void testWriteThrottlingWhenShardFound(){
1425         throttleOperation(new TransactionProxyOperation() {
1426             @Override
1427             public void run(TransactionProxy transactionProxy) {
1428                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1429
1430                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1431                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1432
1433                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1434
1435                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1436             }
1437         });
1438
1439     }
1440
1441     @Test
1442     public void testWriteThrottlingWhenShardNotFound(){
1443         // Confirm that there is no throttling when the Shard is not found
1444         completeOperation(new TransactionProxyOperation() {
1445             @Override
1446             public void run(TransactionProxy transactionProxy) {
1447                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1448
1449                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1450                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1451
1452                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1453
1454                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1455             }
1456         }, false);
1457
1458     }
1459
1460
1461     @Test
1462     public void testWriteCompletion(){
1463         completeOperation(new TransactionProxyOperation() {
1464             @Override
1465             public void run(TransactionProxy transactionProxy) {
1466                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1467
1468                 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1469                         any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
1470
1471                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1472
1473                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1474             }
1475         });
1476
1477     }
1478
1479     @Test
1480     public void testMergeThrottlingWhenShardFound(){
1481
1482         throttleOperation(new TransactionProxyOperation() {
1483             @Override
1484             public void run(TransactionProxy transactionProxy) {
1485                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1486
1487                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1488                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1489
1490                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1491
1492                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1493             }
1494         });
1495     }
1496
1497     @Test
1498     public void testMergeThrottlingWhenShardNotFound(){
1499
1500         completeOperation(new TransactionProxyOperation() {
1501             @Override
1502             public void run(TransactionProxy transactionProxy) {
1503                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1504
1505                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1506                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1507
1508                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1509
1510                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1511             }
1512         }, false);
1513     }
1514
1515     @Test
1516     public void testMergeCompletion(){
1517         completeOperation(new TransactionProxyOperation() {
1518             @Override
1519             public void run(TransactionProxy transactionProxy) {
1520                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1521
1522                 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1523                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1524
1525                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1526
1527                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1528             }
1529         });
1530
1531     }
1532
1533     @Test
1534     public void testDeleteThrottlingWhenShardFound(){
1535
1536         throttleOperation(new TransactionProxyOperation() {
1537             @Override
1538             public void run(TransactionProxy transactionProxy) {
1539                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1540                         any(ActorSelection.class), eqDeleteData());
1541
1542                 transactionProxy.delete(TestModel.TEST_PATH);
1543
1544                 transactionProxy.delete(TestModel.TEST_PATH);
1545             }
1546         });
1547     }
1548
1549
1550     @Test
1551     public void testDeleteThrottlingWhenShardNotFound(){
1552
1553         completeOperation(new TransactionProxyOperation() {
1554             @Override
1555             public void run(TransactionProxy transactionProxy) {
1556                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1557                         any(ActorSelection.class), eqDeleteData());
1558
1559                 transactionProxy.delete(TestModel.TEST_PATH);
1560
1561                 transactionProxy.delete(TestModel.TEST_PATH);
1562             }
1563         }, false);
1564     }
1565
1566     @Test
1567     public void testDeleteCompletion(){
1568         completeOperation(new TransactionProxyOperation() {
1569             @Override
1570             public void run(TransactionProxy transactionProxy) {
1571                 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1572                         any(ActorSelection.class), eqDeleteData());
1573
1574                 transactionProxy.delete(TestModel.TEST_PATH);
1575
1576                 transactionProxy.delete(TestModel.TEST_PATH);
1577             }
1578         });
1579
1580     }
1581
1582     @Test
1583     public void testReadThrottlingWhenShardFound(){
1584
1585         throttleOperation(new TransactionProxyOperation() {
1586             @Override
1587             public void run(TransactionProxy transactionProxy) {
1588                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1589                         any(ActorSelection.class), eqReadData());
1590
1591                 transactionProxy.read(TestModel.TEST_PATH);
1592
1593                 transactionProxy.read(TestModel.TEST_PATH);
1594             }
1595         });
1596     }
1597
1598     @Test
1599     public void testReadThrottlingWhenShardNotFound(){
1600
1601         completeOperation(new TransactionProxyOperation() {
1602             @Override
1603             public void run(TransactionProxy transactionProxy) {
1604                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1605                         any(ActorSelection.class), eqReadData());
1606
1607                 transactionProxy.read(TestModel.TEST_PATH);
1608
1609                 transactionProxy.read(TestModel.TEST_PATH);
1610             }
1611         }, false);
1612     }
1613
1614
1615     @Test
1616     public void testReadCompletion(){
1617         completeOperation(new TransactionProxyOperation() {
1618             @Override
1619             public void run(TransactionProxy transactionProxy) {
1620                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1621
1622                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1623                         any(ActorSelection.class), eqReadData());
1624
1625                 transactionProxy.read(TestModel.TEST_PATH);
1626
1627                 transactionProxy.read(TestModel.TEST_PATH);
1628             }
1629         });
1630
1631     }
1632
1633     @Test
1634     public void testExistsThrottlingWhenShardFound(){
1635
1636         throttleOperation(new TransactionProxyOperation() {
1637             @Override
1638             public void run(TransactionProxy transactionProxy) {
1639                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1640                         any(ActorSelection.class), eqDataExists());
1641
1642                 transactionProxy.exists(TestModel.TEST_PATH);
1643
1644                 transactionProxy.exists(TestModel.TEST_PATH);
1645             }
1646         });
1647     }
1648
1649     @Test
1650     public void testExistsThrottlingWhenShardNotFound(){
1651
1652         completeOperation(new TransactionProxyOperation() {
1653             @Override
1654             public void run(TransactionProxy transactionProxy) {
1655                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1656                         any(ActorSelection.class), eqDataExists());
1657
1658                 transactionProxy.exists(TestModel.TEST_PATH);
1659
1660                 transactionProxy.exists(TestModel.TEST_PATH);
1661             }
1662         }, false);
1663     }
1664
1665
1666     @Test
1667     public void testExistsCompletion(){
1668         completeOperation(new TransactionProxyOperation() {
1669             @Override
1670             public void run(TransactionProxy transactionProxy) {
1671                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1672                         any(ActorSelection.class), eqDataExists());
1673
1674                 transactionProxy.exists(TestModel.TEST_PATH);
1675
1676                 transactionProxy.exists(TestModel.TEST_PATH);
1677             }
1678         });
1679
1680     }
1681
1682     @Test
1683     public void testReadyThrottling(){
1684
1685         throttleOperation(new TransactionProxyOperation() {
1686             @Override
1687             public void run(TransactionProxy transactionProxy) {
1688                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1689
1690                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1691                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1692
1693                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1694                         any(ActorSelection.class), any(ReadyTransaction.class));
1695
1696                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1697
1698                 transactionProxy.ready();
1699             }
1700         });
1701     }
1702
1703     @Test
1704     public void testReadyThrottlingWithTwoTransactionContexts(){
1705
1706         throttleOperation(new TransactionProxyOperation() {
1707             @Override
1708             public void run(TransactionProxy transactionProxy) {
1709                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1710                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1711
1712                 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1713                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1714
1715                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1716                         any(ActorSelection.class), eqWriteData(carsNode));
1717
1718                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1719                         any(ActorSelection.class), any(ReadyTransaction.class));
1720
1721                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1722
1723                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1724
1725                 transactionProxy.ready();
1726             }
1727         }, 2, true);
1728     }
1729 }