5e53b29db13f7fff0accf1397dc691a1f071d8a6
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.argThat;
10 import static org.mockito.Matchers.eq;
11 import static org.mockito.Matchers.isA;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.times;
14 import static org.mockito.Mockito.verify;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Props;
22 import akka.dispatch.Futures;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.typesafe.config.Config;
28 import com.typesafe.config.ConfigFactory;
29 import java.io.IOException;
30 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.AfterClass;
33 import org.junit.Before;
34 import org.junit.BeforeClass;
35 import org.junit.Test;
36 import org.mockito.ArgumentMatcher;
37 import org.mockito.Mock;
38 import org.mockito.Mockito;
39 import org.mockito.MockitoAnnotations;
40 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
43 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
46 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
47 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
48 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
49 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
50 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
51 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
53 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
56 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
57 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
58 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
59 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
60 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
61 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
62 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
63 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
64 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
65 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
69 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
70 import scala.concurrent.Await;
71 import scala.concurrent.Future;
72 import scala.concurrent.duration.Duration;
73
74 @SuppressWarnings("resource")
75 public class TransactionProxyTest {
76
77     @SuppressWarnings("serial")
78     static class TestException extends RuntimeException {
79     }
80
81     static interface Invoker {
82         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
83     }
84
85     private static ActorSystem system;
86
87     private final Configuration configuration = new MockConfiguration();
88
89     @Mock
90     private ActorContext mockActorContext;
91
92     private SchemaContext schemaContext;
93
94     @Mock
95     private ClusterWrapper mockClusterWrapper;
96
97     String memberName = "mock-member";
98
99     @BeforeClass
100     public static void setUpClass() throws IOException {
101
102         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
103                 put("akka.actor.default-dispatcher.type",
104                         "akka.testkit.CallingThreadDispatcherConfigurator").build()).
105                 withFallback(ConfigFactory.load());
106         system = ActorSystem.create("test", config);
107     }
108
109     @AfterClass
110     public static void tearDownClass() throws IOException {
111         JavaTestKit.shutdownActorSystem(system);
112         system = null;
113     }
114
115     @Before
116     public void setUp(){
117         MockitoAnnotations.initMocks(this);
118
119         schemaContext = TestModel.createTestContext();
120
121         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
122
123         doReturn(getSystem()).when(mockActorContext).getActorSystem();
124         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
125         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
126         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
127         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
128         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
129
130         ShardStrategyFactory.setConfiguration(configuration);
131     }
132
133     private ActorSystem getSystem() {
134         return system;
135     }
136
137     private CreateTransaction eqCreateTransaction(final String memberName,
138             final TransactionType type) {
139         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
140             @Override
141             public boolean matches(Object argument) {
142                 if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
143                     CreateTransaction obj = CreateTransaction.fromSerializable(argument);
144                     return obj.getTransactionId().startsWith(memberName) &&
145                             obj.getTransactionType() == type.ordinal();
146                 }
147
148                 return false;
149             }
150         };
151
152         return argThat(matcher);
153     }
154
155     private DataExists eqSerializedDataExists() {
156         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
157             @Override
158             public boolean matches(Object argument) {
159                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
160                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
161             }
162         };
163
164         return argThat(matcher);
165     }
166
167     private DataExists eqDataExists() {
168         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
169             @Override
170             public boolean matches(Object argument) {
171                 return (argument instanceof DataExists) &&
172                     ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
173             }
174         };
175
176         return argThat(matcher);
177     }
178
179     private ReadData eqSerializedReadData() {
180         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
181             @Override
182             public boolean matches(Object argument) {
183                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
184                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
185             }
186         };
187
188         return argThat(matcher);
189     }
190
191     private ReadData eqReadData() {
192         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
193             @Override
194             public boolean matches(Object argument) {
195                 return (argument instanceof ReadData) &&
196                     ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
197             }
198         };
199
200         return argThat(matcher);
201     }
202
203     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
204         return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
205     }
206
207     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
208             final int transactionVersion) {
209         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
210             @Override
211             public boolean matches(Object argument) {
212                 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
213                         WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
214                    (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
215                            ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
216
217                     WriteData obj = WriteData.fromSerializable(argument);
218                     return obj.getPath().equals(TestModel.TEST_PATH) &&
219                            obj.getData().equals(nodeToWrite);
220                 }
221
222                 return false;
223             }
224         };
225
226         return argThat(matcher);
227     }
228
229     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
230         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
231             @Override
232             public boolean matches(Object argument) {
233                 if(argument instanceof WriteData) {
234                     WriteData obj = (WriteData) argument;
235                     return obj.getPath().equals(TestModel.TEST_PATH) &&
236                         obj.getData().equals(nodeToWrite);
237                 }
238                 return false;
239             }
240         };
241
242         return argThat(matcher);
243     }
244
245     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
246         return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
247     }
248
249     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
250             final int transactionVersion) {
251         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
252             @Override
253             public boolean matches(Object argument) {
254                 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
255                         MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
256                    (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
257                            ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
258
259                     MergeData obj = MergeData.fromSerializable(argument);
260                     return obj.getPath().equals(TestModel.TEST_PATH) &&
261                            obj.getData().equals(nodeToWrite);
262                 }
263
264                 return false;
265             }
266         };
267
268         return argThat(matcher);
269     }
270
271     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
272         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
273             @Override
274             public boolean matches(Object argument) {
275                 if(argument instanceof MergeData) {
276                     MergeData obj = ((MergeData) argument);
277                     return obj.getPath().equals(TestModel.TEST_PATH) &&
278                         obj.getData().equals(nodeToWrite);
279                 }
280
281                return false;
282             }
283         };
284
285         return argThat(matcher);
286     }
287
288     private DeleteData eqSerializedDeleteData() {
289         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
290             @Override
291             public boolean matches(Object argument) {
292                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
293                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
294             }
295         };
296
297         return argThat(matcher);
298     }
299
300         private DeleteData eqDeleteData() {
301         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
302             @Override
303             public boolean matches(Object argument) {
304                 return argument instanceof DeleteData &&
305                     ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
306             }
307         };
308
309         return argThat(matcher);
310     }
311
312     private Future<Object> readySerializedTxReply(String path) {
313         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
314     }
315
316     private Future<Object> readyTxReply(String path) {
317         return Futures.successful((Object)new ReadyTransactionReply(path));
318     }
319
320     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
321             short transactionVersion) {
322         return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
323     }
324
325     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
326         return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
327     }
328
329     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
330         return Futures.successful(new ReadDataReply(data));
331     }
332
333     private Future<Object> dataExistsSerializedReply(boolean exists) {
334         return Futures.successful(new DataExistsReply(exists).toSerializable());
335     }
336
337     private Future<DataExistsReply> dataExistsReply(boolean exists) {
338         return Futures.successful(new DataExistsReply(exists));
339     }
340
341     private Future<Object> writeSerializedDataReply(short version) {
342         return Futures.successful(new WriteDataReply().toSerializable(version));
343     }
344
345     private Future<Object> writeSerializedDataReply() {
346         return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
347     }
348
349     private Future<WriteDataReply> writeDataReply() {
350         return Futures.successful(new WriteDataReply());
351     }
352
353     private Future<Object> mergeSerializedDataReply(short version) {
354         return Futures.successful(new MergeDataReply().toSerializable(version));
355     }
356
357     private Future<Object> mergeSerializedDataReply() {
358         return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
359     }
360
361     private Future<MergeDataReply> mergeDataReply() {
362         return Futures.successful(new MergeDataReply());
363     }
364
365     private Future<Object> deleteSerializedDataReply() {
366         return Futures.successful(new DeleteDataReply().toSerializable());
367     }
368
369     private Future<DeleteDataReply> deleteDataReply() {
370         return Futures.successful(new DeleteDataReply());
371     }
372
373     private ActorSelection actorSelection(ActorRef actorRef) {
374         return getSystem().actorSelection(actorRef.path());
375     }
376
377     private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
378         return CreateTransactionReply.newBuilder()
379             .setTransactionActorPath(actorRef.path().toString())
380             .setTransactionId("txn-1")
381             .setMessageVersion(transactionVersion)
382             .build();
383     }
384
385     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
386             TransactionType type, int transactionVersion) {
387         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
388         doReturn(actorSystem.actorSelection(actorRef.path())).
389                 when(mockActorContext).actorSelection(actorRef.path().toString());
390
391         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
392                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
393
394         doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
395                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
396                         eqCreateTransaction(memberName, type));
397
398         return actorRef;
399     }
400
401     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
402         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
403     }
404
405
406     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
407             throws Throwable {
408
409         try {
410             future.checkedGet(5, TimeUnit.SECONDS);
411             fail("Expected ReadFailedException");
412         } catch(ReadFailedException e) {
413             throw e.getCause();
414         }
415     }
416
417     @Test
418     public void testRead() throws Exception {
419         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
420
421         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
422                 READ_ONLY);
423
424         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
425                 eq(actorSelection(actorRef)), eqSerializedReadData());
426
427         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
428                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
429
430         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
431
432         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
433
434         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
435                 eq(actorSelection(actorRef)), eqSerializedReadData());
436
437         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
438
439         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
440
441         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
442     }
443
444     @Test(expected = ReadFailedException.class)
445     public void testReadWithInvalidReplyMessageType() throws Exception {
446         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
447
448         doReturn(Futures.successful(new Object())).when(mockActorContext).
449                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
450
451         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
452                 READ_ONLY);
453
454         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
455     }
456
457     @Test(expected = TestException.class)
458     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
459         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
460
461         doReturn(Futures.failed(new TestException())).when(mockActorContext).
462                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
463
464         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
465                 READ_ONLY);
466
467         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
468     }
469
470     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
471             throws Throwable {
472         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
473
474         if (exToThrow instanceof PrimaryNotFoundException) {
475             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
476         } else {
477             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
478                     when(mockActorContext).findPrimaryShardAsync(anyString());
479         }
480
481         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
482                 any(ActorSelection.class), any());
483
484         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
485
486         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
487     }
488
489     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
490         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
491             @Override
492             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
493                 return proxy.read(TestModel.TEST_PATH);
494             }
495         });
496     }
497
498     @Test(expected = PrimaryNotFoundException.class)
499     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
500         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
501     }
502
503     @Test(expected = TimeoutException.class)
504     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
505         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
506                 new Exception("reason")));
507     }
508
509     @Test(expected = TestException.class)
510     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
511         testReadWithExceptionOnInitialCreateTransaction(new TestException());
512     }
513
514     @Test(expected = TestException.class)
515     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
516         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
517
518         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
519
520         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
521                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
522
523         doReturn(Futures.failed(new TestException())).when(mockActorContext).
524                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
525
526         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
527                 eq(actorSelection(actorRef)), eqSerializedReadData());
528
529         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
530                 READ_WRITE);
531
532         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
533
534         transactionProxy.delete(TestModel.TEST_PATH);
535
536         try {
537             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
538         } finally {
539             verify(mockActorContext, times(0)).executeOperationAsync(
540                     eq(actorSelection(actorRef)), eqSerializedReadData());
541         }
542     }
543
544     @Test
545     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
546         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
547
548         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
549
550         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
551                 eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
552
553         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
554                 eq(actorSelection(actorRef)), eqSerializedReadData());
555
556         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
557                 READ_WRITE);
558
559         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
560
561         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
562                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
563
564         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
565
566         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
567     }
568
569     @Test(expected=IllegalStateException.class)
570     public void testReadPreConditionCheck() {
571
572         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
573                 WRITE_ONLY);
574
575         transactionProxy.read(TestModel.TEST_PATH);
576     }
577
578     @Test(expected=IllegalArgumentException.class)
579     public void testInvalidCreateTransactionReply() throws Throwable {
580         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
581
582         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
583             actorSelection(actorRef.path().toString());
584
585         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
586             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
587
588         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
589             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
590
591         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
592
593         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
594     }
595
596     @Test
597     public void testExists() throws Exception {
598         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
599
600         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
601                 READ_ONLY);
602
603         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
604                 eq(actorSelection(actorRef)), eqSerializedDataExists());
605
606         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
607
608         assertEquals("Exists response", false, exists);
609
610         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
611                 eq(actorSelection(actorRef)), eqSerializedDataExists());
612
613         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
614
615         assertEquals("Exists response", true, exists);
616     }
617
618     @Test(expected = PrimaryNotFoundException.class)
619     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
620         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
621             @Override
622             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
623                 return proxy.exists(TestModel.TEST_PATH);
624             }
625         });
626     }
627
628     @Test(expected = ReadFailedException.class)
629     public void testExistsWithInvalidReplyMessageType() throws Exception {
630         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
631
632         doReturn(Futures.successful(new Object())).when(mockActorContext).
633                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
634
635         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
636                 READ_ONLY);
637
638         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
639     }
640
641     @Test(expected = TestException.class)
642     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
643         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
644
645         doReturn(Futures.failed(new TestException())).when(mockActorContext).
646                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
647
648         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
649                 READ_ONLY);
650
651         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
652     }
653
654     @Test(expected = TestException.class)
655     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
656         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
657
658         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
659
660         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
661                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
662
663         doReturn(Futures.failed(new TestException())).when(mockActorContext).
664                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
665
666         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
667                 eq(actorSelection(actorRef)), eqSerializedDataExists());
668
669         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
670                 READ_WRITE);
671
672         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
673
674         transactionProxy.delete(TestModel.TEST_PATH);
675
676         try {
677             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
678         } finally {
679             verify(mockActorContext, times(0)).executeOperationAsync(
680                     eq(actorSelection(actorRef)), eqSerializedDataExists());
681         }
682     }
683
684     @Test
685     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
686         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
687
688         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
689
690         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
691                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
692
693         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
694                 eq(actorSelection(actorRef)), eqSerializedDataExists());
695
696         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
697                 READ_WRITE);
698
699         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
700
701         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
702
703         assertEquals("Exists response", true, exists);
704     }
705
706     @Test(expected=IllegalStateException.class)
707     public void testExistsPreConditionCheck() {
708
709         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
710                 WRITE_ONLY);
711
712         transactionProxy.exists(TestModel.TEST_PATH);
713     }
714
715     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
716             Class<?>... expResultTypes) throws Exception {
717         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
718
719         int i = 0;
720         for( Future<Object> future: futures) {
721             assertNotNull("Recording operation Future is null", future);
722
723             Class<?> expResultType = expResultTypes[i++];
724             if(Throwable.class.isAssignableFrom(expResultType)) {
725                 try {
726                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
727                     fail("Expected exception from recording operation Future");
728                 } catch(Exception e) {
729                     // Expected
730                 }
731             } else {
732                 assertEquals("Recording operation Future result type", expResultType,
733                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
734             }
735         }
736     }
737
738     @Test
739     public void testWrite() throws Exception {
740         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
741
742         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
743
744         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
745                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
746
747         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
748                 WRITE_ONLY);
749
750         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
751
752         verify(mockActorContext).executeOperationAsync(
753                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
754
755         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
756                 WriteDataReply.class);
757     }
758
759     @Test(expected=IllegalStateException.class)
760     public void testWritePreConditionCheck() {
761
762         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
763                 READ_ONLY);
764
765         transactionProxy.write(TestModel.TEST_PATH,
766                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
767     }
768
769     @Test(expected=IllegalStateException.class)
770     public void testWriteAfterReadyPreConditionCheck() {
771
772         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
773                 WRITE_ONLY);
774
775         transactionProxy.ready();
776
777         transactionProxy.write(TestModel.TEST_PATH,
778                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
779     }
780
781     @Test
782     public void testMerge() throws Exception {
783         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
784
785         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
786
787         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
788                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
789
790         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
791
792         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
793
794         verify(mockActorContext).executeOperationAsync(
795                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
796
797         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
798                 MergeDataReply.class);
799     }
800
801     @Test
802     public void testDelete() throws Exception {
803         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
804
805         doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
806                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
807
808         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
809                 WRITE_ONLY);
810
811         transactionProxy.delete(TestModel.TEST_PATH);
812
813         verify(mockActorContext).executeOperationAsync(
814                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
815
816         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
817                 DeleteDataReply.SERIALIZABLE_CLASS);
818     }
819
820     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
821         Object... expReplies) throws Exception {
822         assertEquals("getReadyOperationFutures size", expReplies.length,
823                 proxy.getCohortFutures().size());
824
825         int i = 0;
826         for( Future<ActorSelection> future: proxy.getCohortFutures()) {
827             assertNotNull("Ready operation Future is null", future);
828
829             Object expReply = expReplies[i++];
830             if(expReply instanceof ActorSelection) {
831                 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
832                 assertEquals("Cohort actor path", expReply, actual);
833             } else {
834                 // Expecting exception.
835                 try {
836                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
837                     fail("Expected exception from ready operation Future");
838                 } catch(Exception e) {
839                     // Expected
840                 }
841             }
842         }
843     }
844
845     @Test
846     public void testReady() throws Exception {
847         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
848
849         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
850
851         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
852                 eq(actorSelection(actorRef)), eqSerializedReadData());
853
854         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
855                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
856
857         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
858                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
859
860         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
861                 READ_WRITE);
862
863         transactionProxy.read(TestModel.TEST_PATH);
864
865         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
866
867         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
868
869         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
870
871         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
872
873         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
874                 WriteDataReply.class);
875
876         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
877     }
878
879     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
880         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
881                 READ_WRITE, version);
882
883         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
884
885         doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
886                 eq(actorSelection(actorRef)), eqSerializedReadData());
887
888         doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
889                 eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
890
891         doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
892                 eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
893
894         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
895                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
896
897         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
898                 eq(actorRef.path().toString()));
899
900         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
901
902         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
903                 get(5, TimeUnit.SECONDS);
904
905         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
906         assertEquals("Response NormalizedNode", testNode, readOptional.get());
907
908         transactionProxy.write(TestModel.TEST_PATH, testNode);
909
910         transactionProxy.merge(TestModel.TEST_PATH, testNode);
911
912         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
913
914         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
915
916         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
917
918         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
919                 ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
920
921         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
922
923         return actorRef;
924     }
925
926     @Test
927     public void testCompatibilityWithBaseHeliumVersion() throws Exception {
928         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
929
930         verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
931                 eq(actorRef.path().toString()));
932     }
933
934     @Test
935     public void testCompatibilityWithHeliumR1Version() throws Exception {
936         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
937
938         verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
939                 eq(actorRef.path().toString()));
940     }
941
942     @Test
943     public void testReadyWithRecordingOperationFailure() throws Exception {
944         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
945
946         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
947
948         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
949                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
950
951         doReturn(Futures.failed(new TestException())).when(mockActorContext).
952                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
953
954         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
955                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
956
957         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
958
959         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
960                 WRITE_ONLY);
961
962         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
963
964         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
965
966         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
967
968         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
969
970         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
971
972         verifyCohortFutures(proxy, TestException.class);
973
974         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
975                 MergeDataReply.class, TestException.class);
976     }
977
978     @Test
979     public void testReadyWithReplyFailure() throws Exception {
980         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
981
982         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
983
984         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
985                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
986
987         doReturn(Futures.failed(new TestException())).when(mockActorContext).
988                 executeOperationAsync(eq(actorSelection(actorRef)),
989                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
990
991         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
992                 WRITE_ONLY);
993
994         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
995
996         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
997
998         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
999
1000         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1001
1002         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1003                 MergeDataReply.class);
1004
1005         verifyCohortFutures(proxy, TestException.class);
1006     }
1007
1008     @Test
1009     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
1010
1011         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
1012                 mockActorContext).findPrimaryShardAsync(anyString());
1013
1014         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1015                 WRITE_ONLY);
1016
1017         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1018
1019         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1020
1021         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1022
1023         transactionProxy.delete(TestModel.TEST_PATH);
1024
1025         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1026
1027         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1028
1029         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1030
1031         verifyCohortFutures(proxy, PrimaryNotFoundException.class);
1032     }
1033
1034     @Test
1035     public void testReadyWithInvalidReplyMessageType() throws Exception {
1036         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1037
1038         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1039
1040         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1041                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1042
1043         doReturn(Futures.successful(new Object())).when(mockActorContext).
1044                 executeOperationAsync(eq(actorSelection(actorRef)),
1045                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
1046
1047         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1048                 WRITE_ONLY);
1049
1050         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1051
1052         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1053
1054         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1055
1056         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1057
1058         verifyCohortFutures(proxy, IllegalArgumentException.class);
1059     }
1060
1061     @Test
1062     public void testGetIdentifier() {
1063         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
1064         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1065                 TransactionProxy.TransactionType.READ_ONLY);
1066
1067         Object id = transactionProxy.getIdentifier();
1068         assertNotNull("getIdentifier returned null", id);
1069         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
1070     }
1071
1072     @Test
1073     public void testClose() throws Exception{
1074         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1075
1076         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
1077                 eq(actorSelection(actorRef)), eqSerializedReadData());
1078
1079         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1080                 READ_WRITE);
1081
1082         transactionProxy.read(TestModel.TEST_PATH);
1083
1084         transactionProxy.close();
1085
1086         verify(mockActorContext).sendOperationAsync(
1087                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
1088     }
1089
1090
1091     /**
1092      * Method to test a local Tx actor. The Tx paths are matched to decide if the
1093      * Tx actor is local or not. This is done by mocking the Tx actor path
1094      * and the caller paths and ensuring that the paths have the remote-address format
1095      *
1096      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
1097      * the paths returned for the actors for all the tests are not qualified remote paths.
1098      * Hence are treated as non-local/remote actors. In short, all tests except
1099      * few below run for remote actors
1100      *
1101      * @throws Exception
1102      */
1103     @Test
1104     public void testLocalTxActorRead() throws Exception {
1105         ActorSystem actorSystem = getSystem();
1106         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1107
1108         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1109             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1110
1111         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1112             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1113
1114         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1115         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1116             .setTransactionId("txn-1")
1117             .setTransactionActorPath(actorPath)
1118             .build();
1119
1120         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1121             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1122                 eqCreateTransaction(memberName, READ_ONLY));
1123
1124         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1125
1126         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
1127
1128         // negative test case with null as the reply
1129         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
1130             any(ActorSelection.class), eqReadData());
1131
1132         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1133             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1134
1135         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
1136
1137         // test case with node as read data reply
1138         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1139
1140         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1141             any(ActorSelection.class), eqReadData());
1142
1143         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1144
1145         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1146
1147         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
1148
1149         // test for local data exists
1150         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1151             any(ActorSelection.class), eqDataExists());
1152
1153         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1154
1155         assertEquals("Exists response", true, exists);
1156     }
1157
1158     @Test
1159     public void testLocalTxActorWrite() throws Exception {
1160         ActorSystem actorSystem = getSystem();
1161         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1162
1163         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1164             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1165
1166         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1167             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1168
1169         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1170         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1171             .setTransactionId("txn-1")
1172             .setTransactionActorPath(actorPath)
1173             .build();
1174
1175         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1176         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1177                 eqCreateTransaction(memberName, WRITE_ONLY));
1178
1179         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1180
1181         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1182
1183         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1184             any(ActorSelection.class), eqWriteData(nodeToWrite));
1185
1186         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
1187         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1188
1189         verify(mockActorContext).executeOperationAsync(
1190             any(ActorSelection.class), eqWriteData(nodeToWrite));
1191
1192         //testing local merge
1193         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1194             any(ActorSelection.class), eqMergeData(nodeToWrite));
1195
1196         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1197
1198         verify(mockActorContext).executeOperationAsync(
1199             any(ActorSelection.class), eqMergeData(nodeToWrite));
1200
1201
1202         //testing local delete
1203         doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1204             any(ActorSelection.class), eqDeleteData());
1205
1206         transactionProxy.delete(TestModel.TEST_PATH);
1207
1208         verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
1209
1210         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1211             WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
1212
1213         // testing ready
1214         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1215             any(ActorSelection.class), isA(ReadyTransaction.class));
1216
1217         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1218
1219         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1220
1221         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1222
1223         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
1224     }
1225 }