Fix for Bug 2290.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSelection;
5 import akka.actor.ActorSystem;
6 import akka.actor.Props;
7 import akka.dispatch.Futures;
8 import akka.testkit.JavaTestKit;
9 import com.google.common.base.Optional;
10 import com.google.common.collect.ImmutableMap;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.typesafe.config.Config;
13 import com.typesafe.config.ConfigFactory;
14 import org.junit.AfterClass;
15 import org.junit.Before;
16 import org.junit.BeforeClass;
17 import org.junit.Test;
18 import org.mockito.ArgumentMatcher;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
21 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
22 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
23 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
24 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
27 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
28 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
29 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
30 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
31 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
42 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
43 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
44 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
45 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
46 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
50 import scala.concurrent.Await;
51 import scala.concurrent.Future;
52 import scala.concurrent.duration.Duration;
53 import java.io.IOException;
54 import java.util.List;
55 import java.util.concurrent.TimeUnit;
56 import static org.junit.Assert.assertEquals;
57 import static org.junit.Assert.assertNotNull;
58 import static org.junit.Assert.assertTrue;
59 import static org.junit.Assert.fail;
60 import static org.mockito.Matchers.any;
61 import static org.mockito.Matchers.anyString;
62 import static org.mockito.Mockito.argThat;
63 import static org.mockito.Mockito.doReturn;
64 import static org.mockito.Mockito.eq;
65 import static org.mockito.Mockito.isA;
66 import static org.mockito.Mockito.times;
67 import static org.mockito.Mockito.verify;
68 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
69 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
70 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
71
72 @SuppressWarnings("resource")
73 public class TransactionProxyTest {
74
75     @SuppressWarnings("serial")
76     static class TestException extends RuntimeException {
77     }
78
79     static interface Invoker {
80         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
81     }
82
83     private static ActorSystem system;
84
85     private final Configuration configuration = new MockConfiguration();
86
87     @Mock
88     private ActorContext mockActorContext;
89
90     private SchemaContext schemaContext;
91
92     @Mock
93     private ClusterWrapper mockClusterWrapper;
94
95     String memberName = "mock-member";
96
97     @BeforeClass
98     public static void setUpClass() throws IOException {
99
100         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
101                 put("akka.actor.default-dispatcher.type",
102                         "akka.testkit.CallingThreadDispatcherConfigurator").build()).
103                 withFallback(ConfigFactory.load());
104         system = ActorSystem.create("test", config);
105     }
106
107     @AfterClass
108     public static void tearDownClass() throws IOException {
109         JavaTestKit.shutdownActorSystem(system);
110         system = null;
111     }
112
113     @Before
114     public void setUp(){
115         MockitoAnnotations.initMocks(this);
116
117         schemaContext = TestModel.createTestContext();
118
119         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
120
121         doReturn(getSystem()).when(mockActorContext).getActorSystem();
122         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
123         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
124         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
125         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
126         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
127
128         ShardStrategyFactory.setConfiguration(configuration);
129     }
130
131     private ActorSystem getSystem() {
132         return system;
133     }
134
135     private CreateTransaction eqCreateTransaction(final String memberName,
136             final TransactionType type) {
137         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
138             @Override
139             public boolean matches(Object argument) {
140                 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
141                 return obj.getTransactionId().startsWith(memberName) &&
142                        obj.getTransactionType() == type.ordinal();
143             }
144         };
145
146         return argThat(matcher);
147     }
148
149     private DataExists eqSerializedDataExists() {
150         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
151             @Override
152             public boolean matches(Object argument) {
153                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
154                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
155             }
156         };
157
158         return argThat(matcher);
159     }
160
161     private DataExists eqDataExists() {
162         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
163             @Override
164             public boolean matches(Object argument) {
165                 return (argument instanceof DataExists) &&
166                     ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
167             }
168         };
169
170         return argThat(matcher);
171     }
172
173     private ReadData eqSerializedReadData() {
174         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
175             @Override
176             public boolean matches(Object argument) {
177                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
178                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
179             }
180         };
181
182         return argThat(matcher);
183     }
184
185     private ReadData eqReadData() {
186         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
187             @Override
188             public boolean matches(Object argument) {
189                 return (argument instanceof ReadData) &&
190                     ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
191             }
192         };
193
194         return argThat(matcher);
195     }
196
197     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
198         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
199             @Override
200             public boolean matches(Object argument) {
201                 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
202                     return false;
203                 }
204
205                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
206                 return obj.getPath().equals(TestModel.TEST_PATH) &&
207                        obj.getData().equals(nodeToWrite);
208             }
209         };
210
211         return argThat(matcher);
212     }
213
214     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
215         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
216             @Override
217             public boolean matches(Object argument) {
218                 if(argument instanceof WriteData) {
219                     WriteData obj = (WriteData) argument;
220                     return obj.getPath().equals(TestModel.TEST_PATH) &&
221                         obj.getData().equals(nodeToWrite);
222                 }
223                 return false;
224             }
225         };
226
227         return argThat(matcher);
228     }
229
230     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
231         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
232             @Override
233             public boolean matches(Object argument) {
234                 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
235                     return false;
236                 }
237
238                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
239                 return obj.getPath().equals(TestModel.TEST_PATH) &&
240                        obj.getData().equals(nodeToWrite);
241             }
242         };
243
244         return argThat(matcher);
245     }
246
247     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
248         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
249             @Override
250             public boolean matches(Object argument) {
251                 if(argument instanceof MergeData) {
252                     MergeData obj = ((MergeData) argument);
253                     return obj.getPath().equals(TestModel.TEST_PATH) &&
254                         obj.getData().equals(nodeToWrite);
255                 }
256
257                return false;
258             }
259         };
260
261         return argThat(matcher);
262     }
263
264     private DeleteData eqSerializedDeleteData() {
265         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
266             @Override
267             public boolean matches(Object argument) {
268                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
269                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
270             }
271         };
272
273         return argThat(matcher);
274     }
275
276         private DeleteData eqDeleteData() {
277         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
278             @Override
279             public boolean matches(Object argument) {
280                 return argument instanceof DeleteData &&
281                     ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
282             }
283         };
284
285         return argThat(matcher);
286     }
287
288     private Future<Object> readySerializedTxReply(String path) {
289         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
290     }
291
292     private Future<Object> readyTxReply(String path) {
293         return Futures.successful((Object)new ReadyTransactionReply(path));
294     }
295
296
297     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
298         return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
299     }
300
301     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
302         return Futures.successful(new ReadDataReply(schemaContext, data));
303     }
304
305     private Future<Object> dataExistsSerializedReply(boolean exists) {
306         return Futures.successful(new DataExistsReply(exists).toSerializable());
307     }
308
309     private Future<DataExistsReply> dataExistsReply(boolean exists) {
310         return Futures.successful(new DataExistsReply(exists));
311     }
312
313     private Future<Object> writeSerializedDataReply() {
314         return Futures.successful(new WriteDataReply().toSerializable());
315     }
316
317     private Future<WriteDataReply> writeDataReply() {
318         return Futures.successful(new WriteDataReply());
319     }
320
321     private Future<Object> mergeSerializedDataReply() {
322         return Futures.successful(new MergeDataReply().toSerializable());
323     }
324
325     private Future<MergeDataReply> mergeDataReply() {
326         return Futures.successful(new MergeDataReply());
327     }
328
329     private Future<Object> deleteSerializedDataReply() {
330         return Futures.successful(new DeleteDataReply().toSerializable());
331     }
332
333     private Future<DeleteDataReply> deleteDataReply() {
334         return Futures.successful(new DeleteDataReply());
335     }
336
337     private ActorSelection actorSelection(ActorRef actorRef) {
338         return getSystem().actorSelection(actorRef.path());
339     }
340
341     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
342         return CreateTransactionReply.newBuilder()
343             .setTransactionActorPath(actorRef.path().toString())
344             .setTransactionId("txn-1").build();
345     }
346
347     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
348         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
349         doReturn(actorSystem.actorSelection(actorRef.path())).
350                 when(mockActorContext).actorSelection(actorRef.path().toString());
351
352         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
353                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
354
355         doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext).
356                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
357                         eqCreateTransaction(memberName, type));
358
359         doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
360
361         return actorRef;
362     }
363
364     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
365             throws Throwable {
366
367         try {
368             future.checkedGet(5, TimeUnit.SECONDS);
369             fail("Expected ReadFailedException");
370         } catch(ReadFailedException e) {
371             throw e.getCause();
372         }
373     }
374
375     @Test
376     public void testRead() throws Exception {
377         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
378
379         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
380                 READ_ONLY);
381
382         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
383                 eq(actorSelection(actorRef)), eqSerializedReadData());
384
385         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
386                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
387
388         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
389
390         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
391
392         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
393                 eq(actorSelection(actorRef)), eqSerializedReadData());
394
395         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
396
397         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
398
399         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
400     }
401
402     @Test(expected = ReadFailedException.class)
403     public void testReadWithInvalidReplyMessageType() throws Exception {
404         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
405
406         doReturn(Futures.successful(new Object())).when(mockActorContext).
407                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
408
409         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
410                 READ_ONLY);
411
412         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
413     }
414
415     @Test(expected = TestException.class)
416     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
417         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
418
419         doReturn(Futures.failed(new TestException())).when(mockActorContext).
420                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
421
422         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
423                 READ_ONLY);
424
425         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
426     }
427
428     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
429             throws Throwable {
430         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
431
432         if (exToThrow instanceof PrimaryNotFoundException) {
433             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
434         } else {
435             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
436                     when(mockActorContext).findPrimaryShardAsync(anyString());
437         }
438
439         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
440                 any(ActorSelection.class), any());
441
442         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
443
444         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
445     }
446
447     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
448         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
449             @Override
450             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
451                 return proxy.read(TestModel.TEST_PATH);
452             }
453         });
454     }
455
456     @Test(expected = PrimaryNotFoundException.class)
457     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
458         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
459     }
460
461     @Test(expected = TimeoutException.class)
462     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
463         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
464                 new Exception("reason")));
465     }
466
467     @Test(expected = TestException.class)
468     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
469         testReadWithExceptionOnInitialCreateTransaction(new TestException());
470     }
471
472     @Test(expected = TestException.class)
473     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
474         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
475
476         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
477
478         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
479                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
480
481         doReturn(Futures.failed(new TestException())).when(mockActorContext).
482                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
483
484         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
485                 eq(actorSelection(actorRef)), eqSerializedReadData());
486
487         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
488                 READ_WRITE);
489
490         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
491
492         transactionProxy.delete(TestModel.TEST_PATH);
493
494         try {
495             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
496         } finally {
497             verify(mockActorContext, times(0)).executeOperationAsync(
498                     eq(actorSelection(actorRef)), eqSerializedReadData());
499         }
500     }
501
502     @Test
503     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
504         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
505
506         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
507
508         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
509                 eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
510
511         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
512                 eq(actorSelection(actorRef)), eqSerializedReadData());
513
514         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
515                 READ_WRITE);
516
517         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
518
519         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
520                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
521
522         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
523
524         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
525     }
526
527     @Test(expected=IllegalStateException.class)
528     public void testReadPreConditionCheck() {
529
530         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
531                 WRITE_ONLY);
532
533         transactionProxy.read(TestModel.TEST_PATH);
534     }
535
536     @Test(expected=IllegalArgumentException.class)
537     public void testInvalidCreateTransactionReply() throws Throwable {
538         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
539
540         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
541             actorSelection(actorRef.path().toString());
542
543         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
544             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
545
546         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
547             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
548
549         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
550
551         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
552     }
553
554     @Test
555     public void testExists() throws Exception {
556         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
557
558         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
559                 READ_ONLY);
560
561         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
562                 eq(actorSelection(actorRef)), eqSerializedDataExists());
563
564         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
565
566         assertEquals("Exists response", false, exists);
567
568         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
569                 eq(actorSelection(actorRef)), eqSerializedDataExists());
570
571         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
572
573         assertEquals("Exists response", true, exists);
574     }
575
576     @Test(expected = PrimaryNotFoundException.class)
577     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
578         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
579             @Override
580             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
581                 return proxy.exists(TestModel.TEST_PATH);
582             }
583         });
584     }
585
586     @Test(expected = ReadFailedException.class)
587     public void testExistsWithInvalidReplyMessageType() throws Exception {
588         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
589
590         doReturn(Futures.successful(new Object())).when(mockActorContext).
591                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
592
593         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
594                 READ_ONLY);
595
596         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
597     }
598
599     @Test(expected = TestException.class)
600     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
601         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
602
603         doReturn(Futures.failed(new TestException())).when(mockActorContext).
604                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
605
606         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
607                 READ_ONLY);
608
609         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
610     }
611
612     @Test(expected = TestException.class)
613     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
614         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
615
616         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
617
618         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
619                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
620
621         doReturn(Futures.failed(new TestException())).when(mockActorContext).
622                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
623
624         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
625                 eq(actorSelection(actorRef)), eqSerializedDataExists());
626
627         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
628                 READ_WRITE);
629
630         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
631
632         transactionProxy.delete(TestModel.TEST_PATH);
633
634         try {
635             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
636         } finally {
637             verify(mockActorContext, times(0)).executeOperationAsync(
638                     eq(actorSelection(actorRef)), eqSerializedDataExists());
639         }
640     }
641
642     @Test
643     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
644         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
645
646         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
647
648         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
649                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
650
651         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
652                 eq(actorSelection(actorRef)), eqSerializedDataExists());
653
654         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
655                 READ_WRITE);
656
657         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
658
659         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
660
661         assertEquals("Exists response", true, exists);
662     }
663
664     @Test(expected=IllegalStateException.class)
665     public void testxistsPreConditionCheck() {
666
667         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
668                 WRITE_ONLY);
669
670         transactionProxy.exists(TestModel.TEST_PATH);
671     }
672
673     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
674             Class<?>... expResultTypes) throws Exception {
675         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
676
677         int i = 0;
678         for( Future<Object> future: futures) {
679             assertNotNull("Recording operation Future is null", future);
680
681             Class<?> expResultType = expResultTypes[i++];
682             if(Throwable.class.isAssignableFrom(expResultType)) {
683                 try {
684                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
685                     fail("Expected exception from recording operation Future");
686                 } catch(Exception e) {
687                     // Expected
688                 }
689             } else {
690                 assertEquals("Recording operation Future result type", expResultType,
691                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
692             }
693         }
694     }
695
696     @Test
697     public void testWrite() throws Exception {
698         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
699
700         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
701
702         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
703                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
704
705         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
706                 WRITE_ONLY);
707
708         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
709
710         verify(mockActorContext).executeOperationAsync(
711                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
712
713         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
714                 WriteDataReply.SERIALIZABLE_CLASS);
715     }
716
717     @Test(expected=IllegalStateException.class)
718     public void testWritePreConditionCheck() {
719
720         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
721                 READ_ONLY);
722
723         transactionProxy.write(TestModel.TEST_PATH,
724                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
725     }
726
727     @Test(expected=IllegalStateException.class)
728     public void testWriteAfterReadyPreConditionCheck() {
729
730         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
731                 WRITE_ONLY);
732
733         transactionProxy.ready();
734
735         transactionProxy.write(TestModel.TEST_PATH,
736                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
737     }
738
739     @Test
740     public void testMerge() throws Exception {
741         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
742
743         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
744
745         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
746                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
747
748         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
749
750         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
751
752         verify(mockActorContext).executeOperationAsync(
753                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
754
755         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
756                 MergeDataReply.SERIALIZABLE_CLASS);
757     }
758
759     @Test
760     public void testDelete() throws Exception {
761         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
762
763         doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
764                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
765
766         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
767                 WRITE_ONLY);
768
769         transactionProxy.delete(TestModel.TEST_PATH);
770
771         verify(mockActorContext).executeOperationAsync(
772                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
773
774         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
775                 DeleteDataReply.SERIALIZABLE_CLASS);
776     }
777
778     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
779         Object... expReplies) throws Exception {
780         assertEquals("getReadyOperationFutures size", expReplies.length,
781                 proxy.getCohortFutures().size());
782
783         int i = 0;
784         for( Future<ActorSelection> future: proxy.getCohortFutures()) {
785             assertNotNull("Ready operation Future is null", future);
786
787             Object expReply = expReplies[i++];
788             if(expReply instanceof ActorSelection) {
789                 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
790                 assertEquals("Cohort actor path", expReply, actual);
791             } else {
792                 // Expecting exception.
793                 try {
794                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
795                     fail("Expected exception from ready operation Future");
796                 } catch(Exception e) {
797                     // Expected
798                 }
799             }
800         }
801     }
802
803     @SuppressWarnings("unchecked")
804     @Test
805     public void testReady() throws Exception {
806         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
807
808         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
809
810         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
811                 eq(actorSelection(actorRef)), eqSerializedReadData());
812
813         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
814                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
815
816         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
817                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
818
819         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
820                 READ_WRITE);
821
822         transactionProxy.read(TestModel.TEST_PATH);
823
824         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
825
826         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
827
828         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
829
830         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
831
832         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
833                 WriteDataReply.SERIALIZABLE_CLASS);
834
835         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
836     }
837
838     @SuppressWarnings("unchecked")
839     @Test
840     public void testReadyWithRecordingOperationFailure() throws Exception {
841         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
842
843         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
844
845         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
846                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
847
848         doReturn(Futures.failed(new TestException())).when(mockActorContext).
849                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
850
851         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
852                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
853
854         doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
855
856         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
857                 WRITE_ONLY);
858
859         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
860
861         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
862
863         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
864
865         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
866
867         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
868
869         verifyCohortFutures(proxy, TestException.class);
870
871         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
872                 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
873     }
874
875     @SuppressWarnings("unchecked")
876     @Test
877     public void testReadyWithReplyFailure() throws Exception {
878         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
879
880         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
881
882         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
883                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
884
885         doReturn(Futures.failed(new TestException())).when(mockActorContext).
886                 executeOperationAsync(eq(actorSelection(actorRef)),
887                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
888
889         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
890                 WRITE_ONLY);
891
892         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
893
894         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
895
896         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
897
898         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
899
900         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
901                 MergeDataReply.SERIALIZABLE_CLASS);
902
903         verifyCohortFutures(proxy, TestException.class);
904     }
905
906     @Test
907     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
908
909         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
910                 mockActorContext).findPrimaryShardAsync(anyString());
911
912         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
913                 WRITE_ONLY);
914
915         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
916
917         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
918
919         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
920
921         transactionProxy.delete(TestModel.TEST_PATH);
922
923         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
924
925         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
926
927         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
928
929         verifyCohortFutures(proxy, PrimaryNotFoundException.class);
930     }
931
932     @SuppressWarnings("unchecked")
933     @Test
934     public void testReadyWithInvalidReplyMessageType() throws Exception {
935         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
936
937         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
938
939         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
940                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
941
942         doReturn(Futures.successful(new Object())).when(mockActorContext).
943                 executeOperationAsync(eq(actorSelection(actorRef)),
944                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
945
946         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
947                 WRITE_ONLY);
948
949         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
950
951         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
952
953         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
954
955         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
956
957         verifyCohortFutures(proxy, IllegalArgumentException.class);
958     }
959
960     @Test
961     public void testGetIdentifier() {
962         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
963         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
964                 TransactionProxy.TransactionType.READ_ONLY);
965
966         Object id = transactionProxy.getIdentifier();
967         assertNotNull("getIdentifier returned null", id);
968         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
969     }
970
971     @SuppressWarnings("unchecked")
972     @Test
973     public void testClose() throws Exception{
974         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
975
976         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
977                 eq(actorSelection(actorRef)), eqSerializedReadData());
978
979         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
980                 READ_WRITE);
981
982         transactionProxy.read(TestModel.TEST_PATH);
983
984         transactionProxy.close();
985
986         verify(mockActorContext).sendOperationAsync(
987                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
988     }
989
990
991     /**
992      * Method to test a local Tx actor. The Tx paths are matched to decide if the
993      * Tx actor is local or not. This is done by mocking the Tx actor path
994      * and the caller paths and ensuring that the paths have the remote-address format
995      *
996      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
997      * the paths returned for the actors for all the tests are not qualified remote paths.
998      * Hence are treated as non-local/remote actors. In short, all tests except
999      * few below run for remote actors
1000      *
1001      * @throws Exception
1002      */
1003     @Test
1004     public void testLocalTxActorRead() throws Exception {
1005         ActorSystem actorSystem = getSystem();
1006         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1007
1008         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1009             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1010
1011         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1012             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1013
1014         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1015         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1016             .setTransactionId("txn-1")
1017             .setTransactionActorPath(actorPath)
1018             .build();
1019
1020         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1021             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1022                 eqCreateTransaction(memberName, READ_ONLY));
1023
1024         doReturn(true).when(mockActorContext).isLocalPath(actorPath);
1025
1026         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
1027
1028         // negative test case with null as the reply
1029         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
1030             any(ActorSelection.class), eqReadData());
1031
1032         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1033             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1034
1035         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
1036
1037         // test case with node as read data reply
1038         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1039
1040         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1041             any(ActorSelection.class), eqReadData());
1042
1043         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1044
1045         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1046
1047         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
1048
1049         // test for local data exists
1050         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1051             any(ActorSelection.class), eqDataExists());
1052
1053         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1054
1055         assertEquals("Exists response", true, exists);
1056     }
1057
1058     @Test
1059     public void testLocalTxActorWrite() throws Exception {
1060         ActorSystem actorSystem = getSystem();
1061         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1062
1063         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1064             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1065
1066         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1067             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1068
1069         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1070         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1071             .setTransactionId("txn-1")
1072             .setTransactionActorPath(actorPath)
1073             .build();
1074
1075         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1076         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1077                 eqCreateTransaction(memberName, WRITE_ONLY));
1078
1079         doReturn(true).when(mockActorContext).isLocalPath(actorPath);
1080
1081         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1082
1083         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1084             any(ActorSelection.class), eqWriteData(nodeToWrite));
1085
1086         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
1087         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1088
1089         verify(mockActorContext).executeOperationAsync(
1090             any(ActorSelection.class), eqWriteData(nodeToWrite));
1091
1092         //testing local merge
1093         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1094             any(ActorSelection.class), eqMergeData(nodeToWrite));
1095
1096         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1097
1098         verify(mockActorContext).executeOperationAsync(
1099             any(ActorSelection.class), eqMergeData(nodeToWrite));
1100
1101
1102         //testing local delete
1103         doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1104             any(ActorSelection.class), eqDeleteData());
1105
1106         transactionProxy.delete(TestModel.TEST_PATH);
1107
1108         verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
1109
1110         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1111             WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
1112
1113         // testing ready
1114         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1115             any(ActorSelection.class), isA(ReadyTransaction.class));
1116
1117         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1118
1119         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1120
1121         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1122
1123         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
1124     }
1125 }