6d80bbb5b158eb2d962a87c0bb6bf08654be2bf8
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.argThat;
10 import static org.mockito.Matchers.eq;
11 import static org.mockito.Matchers.isA;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
18 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSelection;
21 import akka.actor.ActorSystem;
22 import akka.actor.Props;
23 import akka.dispatch.Futures;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.typesafe.config.Config;
29 import com.typesafe.config.ConfigFactory;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.AfterClass;
34 import org.junit.Assert;
35 import org.junit.Before;
36 import org.junit.BeforeClass;
37 import org.junit.Test;
38 import org.mockito.ArgumentMatcher;
39 import org.mockito.Mock;
40 import org.mockito.Mockito;
41 import org.mockito.MockitoAnnotations;
42 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
43 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
44 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
45 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
48 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
50 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
51 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
52 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
53 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
54 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
55 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
58 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
59 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
60 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
61 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
62 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
63 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
64 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
67 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
68 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
71 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
72 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
73 import scala.concurrent.Await;
74 import scala.concurrent.Future;
75 import scala.concurrent.duration.Duration;
76
77 @SuppressWarnings("resource")
78 public class TransactionProxyTest {
79
80     @SuppressWarnings("serial")
81     static class TestException extends RuntimeException {
82     }
83
84     static interface Invoker {
85         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
86     }
87
88     private static ActorSystem system;
89
90     private final Configuration configuration = new MockConfiguration();
91
92     @Mock
93     private ActorContext mockActorContext;
94
95     private SchemaContext schemaContext;
96
97     @Mock
98     private ClusterWrapper mockClusterWrapper;
99
100     String memberName = "mock-member";
101
102     @BeforeClass
103     public static void setUpClass() throws IOException {
104
105         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
106                 put("akka.actor.default-dispatcher.type",
107                         "akka.testkit.CallingThreadDispatcherConfigurator").build()).
108                 withFallback(ConfigFactory.load());
109         system = ActorSystem.create("test", config);
110     }
111
112     @AfterClass
113     public static void tearDownClass() throws IOException {
114         JavaTestKit.shutdownActorSystem(system);
115         system = null;
116     }
117
118     @Before
119     public void setUp(){
120         MockitoAnnotations.initMocks(this);
121
122         schemaContext = TestModel.createTestContext();
123
124         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
125
126         doReturn(getSystem()).when(mockActorContext).getActorSystem();
127         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
128         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
129         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
130         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
131         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
132         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
133
134         ShardStrategyFactory.setConfiguration(configuration);
135     }
136
137     private ActorSystem getSystem() {
138         return system;
139     }
140
141     private CreateTransaction eqCreateTransaction(final String memberName,
142             final TransactionType type) {
143         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
144             @Override
145             public boolean matches(Object argument) {
146                 if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
147                     CreateTransaction obj = CreateTransaction.fromSerializable(argument);
148                     return obj.getTransactionId().startsWith(memberName) &&
149                             obj.getTransactionType() == type.ordinal();
150                 }
151
152                 return false;
153             }
154         };
155
156         return argThat(matcher);
157     }
158
159     private DataExists eqSerializedDataExists() {
160         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
161             @Override
162             public boolean matches(Object argument) {
163                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
164                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
165             }
166         };
167
168         return argThat(matcher);
169     }
170
171     private DataExists eqDataExists() {
172         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
173             @Override
174             public boolean matches(Object argument) {
175                 return (argument instanceof DataExists) &&
176                     ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
177             }
178         };
179
180         return argThat(matcher);
181     }
182
183     private ReadData eqSerializedReadData() {
184         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
185             @Override
186             public boolean matches(Object argument) {
187                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
188                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
189             }
190         };
191
192         return argThat(matcher);
193     }
194
195     private ReadData eqReadData() {
196         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
197             @Override
198             public boolean matches(Object argument) {
199                 return (argument instanceof ReadData) &&
200                     ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
201             }
202         };
203
204         return argThat(matcher);
205     }
206
207     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
208         return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
209     }
210
211     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
212             final int transactionVersion) {
213         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
214             @Override
215             public boolean matches(Object argument) {
216                 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
217                         WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
218                    (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
219                            ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
220
221                     WriteData obj = WriteData.fromSerializable(argument);
222                     return obj.getPath().equals(TestModel.TEST_PATH) &&
223                            obj.getData().equals(nodeToWrite);
224                 }
225
226                 return false;
227             }
228         };
229
230         return argThat(matcher);
231     }
232
233     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
234         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
235             @Override
236             public boolean matches(Object argument) {
237                 if(argument instanceof WriteData) {
238                     WriteData obj = (WriteData) argument;
239                     return obj.getPath().equals(TestModel.TEST_PATH) &&
240                         obj.getData().equals(nodeToWrite);
241                 }
242                 return false;
243             }
244         };
245
246         return argThat(matcher);
247     }
248
249     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
250         return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
251     }
252
253     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
254             final int transactionVersion) {
255         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
256             @Override
257             public boolean matches(Object argument) {
258                 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
259                         MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
260                    (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
261                            ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
262
263                     MergeData obj = MergeData.fromSerializable(argument);
264                     return obj.getPath().equals(TestModel.TEST_PATH) &&
265                            obj.getData().equals(nodeToWrite);
266                 }
267
268                 return false;
269             }
270         };
271
272         return argThat(matcher);
273     }
274
275     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
276         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
277             @Override
278             public boolean matches(Object argument) {
279                 if(argument instanceof MergeData) {
280                     MergeData obj = ((MergeData) argument);
281                     return obj.getPath().equals(TestModel.TEST_PATH) &&
282                         obj.getData().equals(nodeToWrite);
283                 }
284
285                return false;
286             }
287         };
288
289         return argThat(matcher);
290     }
291
292     private DeleteData eqSerializedDeleteData() {
293         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
294             @Override
295             public boolean matches(Object argument) {
296                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
297                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
298             }
299         };
300
301         return argThat(matcher);
302     }
303
304         private DeleteData eqDeleteData() {
305         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
306             @Override
307             public boolean matches(Object argument) {
308                 return argument instanceof DeleteData &&
309                     ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
310             }
311         };
312
313         return argThat(matcher);
314     }
315
316     private Future<Object> readySerializedTxReply(String path) {
317         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
318     }
319
320     private Future<Object> readyTxReply(String path) {
321         return Futures.successful((Object)new ReadyTransactionReply(path));
322     }
323
324     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
325             short transactionVersion) {
326         return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
327     }
328
329     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
330         return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
331     }
332
333     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
334         return Futures.successful(new ReadDataReply(data));
335     }
336
337     private Future<Object> dataExistsSerializedReply(boolean exists) {
338         return Futures.successful(new DataExistsReply(exists).toSerializable());
339     }
340
341     private Future<DataExistsReply> dataExistsReply(boolean exists) {
342         return Futures.successful(new DataExistsReply(exists));
343     }
344
345     private Future<Object> writeSerializedDataReply(short version) {
346         return Futures.successful(new WriteDataReply().toSerializable(version));
347     }
348
349     private Future<Object> writeSerializedDataReply() {
350         return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
351     }
352
353     private Future<WriteDataReply> writeDataReply() {
354         return Futures.successful(new WriteDataReply());
355     }
356
357     private Future<Object> mergeSerializedDataReply(short version) {
358         return Futures.successful(new MergeDataReply().toSerializable(version));
359     }
360
361     private Future<Object> mergeSerializedDataReply() {
362         return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
363     }
364
365     private Future<Object> incompleteFuture(){
366         return mock(Future.class);
367     }
368
369     private Future<MergeDataReply> mergeDataReply() {
370         return Futures.successful(new MergeDataReply());
371     }
372
373     private Future<Object> deleteSerializedDataReply(short version) {
374         return Futures.successful(new DeleteDataReply().toSerializable(version));
375     }
376
377     private Future<Object> deleteSerializedDataReply() {
378         return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
379     }
380
381     private Future<DeleteDataReply> deleteDataReply() {
382         return Futures.successful(new DeleteDataReply());
383     }
384
385     private ActorSelection actorSelection(ActorRef actorRef) {
386         return getSystem().actorSelection(actorRef.path());
387     }
388
389     private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
390         return CreateTransactionReply.newBuilder()
391             .setTransactionActorPath(actorRef.path().toString())
392             .setTransactionId("txn-1")
393             .setMessageVersion(transactionVersion)
394             .build();
395     }
396
397     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
398             TransactionType type, int transactionVersion) {
399         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
400         doReturn(actorSystem.actorSelection(actorRef.path())).
401                 when(mockActorContext).actorSelection(actorRef.path().toString());
402
403         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
404                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
405
406         doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
407                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
408                         eqCreateTransaction(memberName, type));
409
410         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
411
412         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
413
414         return actorRef;
415     }
416
417     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
418         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
419     }
420
421
422     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
423             throws Throwable {
424
425         try {
426             future.checkedGet(5, TimeUnit.SECONDS);
427             fail("Expected ReadFailedException");
428         } catch(ReadFailedException e) {
429             throw e.getCause();
430         }
431     }
432
433     @Test
434     public void testRead() throws Exception {
435         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
436
437         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
438                 READ_ONLY);
439
440         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
441                 eq(actorSelection(actorRef)), eqSerializedReadData());
442
443         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
444                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
445
446         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
447
448         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
449
450         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
451                 eq(actorSelection(actorRef)), eqSerializedReadData());
452
453         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
454
455         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
456
457         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
458     }
459
460     @Test(expected = ReadFailedException.class)
461     public void testReadWithInvalidReplyMessageType() throws Exception {
462         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
463
464         doReturn(Futures.successful(new Object())).when(mockActorContext).
465                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
466
467         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
468                 READ_ONLY);
469
470         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
471     }
472
473     @Test(expected = TestException.class)
474     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
475         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
476
477         doReturn(Futures.failed(new TestException())).when(mockActorContext).
478                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
479
480         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
481                 READ_ONLY);
482
483         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
484     }
485
486     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
487             throws Throwable {
488         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
489
490         if (exToThrow instanceof PrimaryNotFoundException) {
491             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
492         } else {
493             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
494                     when(mockActorContext).findPrimaryShardAsync(anyString());
495         }
496
497         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
498                 any(ActorSelection.class), any());
499
500         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
501
502         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
503     }
504
505     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
506         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
507             @Override
508             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
509                 return proxy.read(TestModel.TEST_PATH);
510             }
511         });
512     }
513
514     @Test(expected = PrimaryNotFoundException.class)
515     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
516         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
517     }
518
519     @Test(expected = TimeoutException.class)
520     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
521         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
522                 new Exception("reason")));
523     }
524
525     @Test(expected = TestException.class)
526     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
527         testReadWithExceptionOnInitialCreateTransaction(new TestException());
528     }
529
530     @Test(expected = TestException.class)
531     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
532         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
533
534         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
535
536         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
537                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
538
539         doReturn(Futures.failed(new TestException())).when(mockActorContext).
540                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
541
542         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
543                 eq(actorSelection(actorRef)), eqSerializedReadData());
544
545         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
546                 READ_WRITE);
547
548         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
549
550         transactionProxy.delete(TestModel.TEST_PATH);
551
552         try {
553             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
554         } finally {
555             verify(mockActorContext, times(0)).executeOperationAsync(
556                     eq(actorSelection(actorRef)), eqSerializedReadData());
557         }
558     }
559
560     @Test
561     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
562         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
563
564         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
565
566         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
567                 eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
568
569         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
570                 eq(actorSelection(actorRef)), eqSerializedReadData());
571
572         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
573                 READ_WRITE);
574
575         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
576
577         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
578                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
579
580         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
581
582         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
583     }
584
585     @Test(expected=IllegalStateException.class)
586     public void testReadPreConditionCheck() {
587
588         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
589                 WRITE_ONLY);
590
591         transactionProxy.read(TestModel.TEST_PATH);
592     }
593
594     @Test(expected=IllegalArgumentException.class)
595     public void testInvalidCreateTransactionReply() throws Throwable {
596         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
597
598         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
599             actorSelection(actorRef.path().toString());
600
601         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
602             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
603
604         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
605             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
606
607         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
608
609         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
610     }
611
612     @Test
613     public void testExists() throws Exception {
614         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
615
616         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
617                 READ_ONLY);
618
619         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
620                 eq(actorSelection(actorRef)), eqSerializedDataExists());
621
622         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
623
624         assertEquals("Exists response", false, exists);
625
626         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
627                 eq(actorSelection(actorRef)), eqSerializedDataExists());
628
629         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
630
631         assertEquals("Exists response", true, exists);
632     }
633
634     @Test(expected = PrimaryNotFoundException.class)
635     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
636         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
637             @Override
638             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
639                 return proxy.exists(TestModel.TEST_PATH);
640             }
641         });
642     }
643
644     @Test(expected = ReadFailedException.class)
645     public void testExistsWithInvalidReplyMessageType() throws Exception {
646         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
647
648         doReturn(Futures.successful(new Object())).when(mockActorContext).
649                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
650
651         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
652                 READ_ONLY);
653
654         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
655     }
656
657     @Test(expected = TestException.class)
658     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
659         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
660
661         doReturn(Futures.failed(new TestException())).when(mockActorContext).
662                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
663
664         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
665                 READ_ONLY);
666
667         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
668     }
669
670     @Test(expected = TestException.class)
671     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
672         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
673
674         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
675
676         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
677                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
678
679         doReturn(Futures.failed(new TestException())).when(mockActorContext).
680                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
681
682         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
683                 eq(actorSelection(actorRef)), eqSerializedDataExists());
684
685         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
686                 READ_WRITE);
687
688         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
689
690         transactionProxy.delete(TestModel.TEST_PATH);
691
692         try {
693             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
694         } finally {
695             verify(mockActorContext, times(0)).executeOperationAsync(
696                     eq(actorSelection(actorRef)), eqSerializedDataExists());
697         }
698     }
699
700     @Test
701     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
702         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
703
704         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
705
706         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
707                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
708
709         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
710                 eq(actorSelection(actorRef)), eqSerializedDataExists());
711
712         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
713                 READ_WRITE);
714
715         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
716
717         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
718
719         assertEquals("Exists response", true, exists);
720     }
721
722     @Test(expected=IllegalStateException.class)
723     public void testExistsPreConditionCheck() {
724
725         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
726                 WRITE_ONLY);
727
728         transactionProxy.exists(TestModel.TEST_PATH);
729     }
730
731     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
732             Class<?>... expResultTypes) throws Exception {
733         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
734
735         int i = 0;
736         for( Future<Object> future: futures) {
737             assertNotNull("Recording operation Future is null", future);
738
739             Class<?> expResultType = expResultTypes[i++];
740             if(Throwable.class.isAssignableFrom(expResultType)) {
741                 try {
742                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
743                     fail("Expected exception from recording operation Future");
744                 } catch(Exception e) {
745                     // Expected
746                 }
747             } else {
748                 assertEquals("Recording operation Future result type", expResultType,
749                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
750             }
751         }
752     }
753
754     @Test
755     public void testWrite() throws Exception {
756         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
757
758         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
759
760         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
761                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
762
763         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
764                 WRITE_ONLY);
765
766         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
767
768         verify(mockActorContext).executeOperationAsync(
769                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
770
771         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
772                 WriteDataReply.class);
773     }
774
775     @Test(expected=IllegalStateException.class)
776     public void testWritePreConditionCheck() {
777
778         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
779                 READ_ONLY);
780
781         transactionProxy.write(TestModel.TEST_PATH,
782                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
783     }
784
785     @Test(expected=IllegalStateException.class)
786     public void testWriteAfterReadyPreConditionCheck() {
787
788         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
789                 WRITE_ONLY);
790
791         transactionProxy.ready();
792
793         transactionProxy.write(TestModel.TEST_PATH,
794                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
795     }
796
797     @Test
798     public void testMerge() throws Exception {
799         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
800
801         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
802
803         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
804                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
805
806         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
807
808         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
809
810         verify(mockActorContext).executeOperationAsync(
811                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
812
813         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
814                 MergeDataReply.class);
815     }
816
817     @Test
818     public void testDelete() throws Exception {
819         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
820
821         doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
822                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
823
824         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
825                 WRITE_ONLY);
826
827         transactionProxy.delete(TestModel.TEST_PATH);
828
829         verify(mockActorContext).executeOperationAsync(
830                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
831
832         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
833                 DeleteDataReply.class);
834     }
835
836     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
837         Object... expReplies) throws Exception {
838         assertEquals("getReadyOperationFutures size", expReplies.length,
839                 proxy.getCohortFutures().size());
840
841         int i = 0;
842         for( Future<ActorSelection> future: proxy.getCohortFutures()) {
843             assertNotNull("Ready operation Future is null", future);
844
845             Object expReply = expReplies[i++];
846             if(expReply instanceof ActorSelection) {
847                 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
848                 assertEquals("Cohort actor path", expReply, actual);
849             } else {
850                 // Expecting exception.
851                 try {
852                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
853                     fail("Expected exception from ready operation Future");
854                 } catch(Exception e) {
855                     // Expected
856                 }
857             }
858         }
859     }
860
861     @Test
862     public void testReady() throws Exception {
863         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
864
865         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
866
867         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
868                 eq(actorSelection(actorRef)), eqSerializedReadData());
869
870         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
871                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
872
873         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
874                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
875
876         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
877                 READ_WRITE);
878
879         transactionProxy.read(TestModel.TEST_PATH);
880
881         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
882
883         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
884
885         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
886
887         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
888
889         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
890                 WriteDataReply.class);
891
892         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
893     }
894
895     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
896         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
897                 READ_WRITE, version);
898
899         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
900
901         doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
902                 eq(actorSelection(actorRef)), eqSerializedReadData());
903
904         doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
905                 eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
906
907         doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
908                 eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
909
910         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
911                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
912
913         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
914                 eq(actorRef.path().toString()));
915
916         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
917
918         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
919                 get(5, TimeUnit.SECONDS);
920
921         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
922         assertEquals("Response NormalizedNode", testNode, readOptional.get());
923
924         transactionProxy.write(TestModel.TEST_PATH, testNode);
925
926         transactionProxy.merge(TestModel.TEST_PATH, testNode);
927
928         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
929
930         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
931
932         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
933
934         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
935                 ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
936
937         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
938
939         return actorRef;
940     }
941
942     @Test
943     public void testCompatibilityWithBaseHeliumVersion() throws Exception {
944         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
945
946         verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
947                 eq(actorRef.path().toString()));
948     }
949
950     @Test
951     public void testCompatibilityWithHeliumR1Version() throws Exception {
952         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
953
954         verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
955                 eq(actorRef.path().toString()));
956     }
957
958     @Test
959     public void testReadyWithRecordingOperationFailure() throws Exception {
960         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
961
962         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
963
964         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
965                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
966
967         doReturn(Futures.failed(new TestException())).when(mockActorContext).
968                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
969
970         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
971                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
972
973         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
974
975         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
976                 WRITE_ONLY);
977
978         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
979
980         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
981
982         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
983
984         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
985
986         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
987
988         verifyCohortFutures(proxy, TestException.class);
989
990         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
991                 MergeDataReply.class, TestException.class);
992     }
993
994     @Test
995     public void testReadyWithReplyFailure() throws Exception {
996         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
997
998         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
999
1000         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1001                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
1002
1003         doReturn(Futures.failed(new TestException())).when(mockActorContext).
1004                 executeOperationAsync(eq(actorSelection(actorRef)),
1005                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
1006
1007         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1008                 WRITE_ONLY);
1009
1010         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1011
1012         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1013
1014         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1015
1016         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1017
1018         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1019                 MergeDataReply.class);
1020
1021         verifyCohortFutures(proxy, TestException.class);
1022     }
1023
1024     @Test
1025     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
1026
1027         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
1028                 mockActorContext).findPrimaryShardAsync(anyString());
1029
1030         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1031                 WRITE_ONLY);
1032
1033         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1034
1035         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1036
1037         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1038
1039         transactionProxy.delete(TestModel.TEST_PATH);
1040
1041         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1042
1043         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1044
1045         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1046
1047         verifyCohortFutures(proxy, PrimaryNotFoundException.class);
1048     }
1049
1050     @Test
1051     public void testReadyWithInvalidReplyMessageType() throws Exception {
1052         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1053
1054         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1055
1056         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1057                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1058
1059         doReturn(Futures.successful(new Object())).when(mockActorContext).
1060                 executeOperationAsync(eq(actorSelection(actorRef)),
1061                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
1062
1063         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1064                 WRITE_ONLY);
1065
1066         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1067
1068         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1069
1070         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1071
1072         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1073
1074         verifyCohortFutures(proxy, IllegalArgumentException.class);
1075     }
1076
1077     @Test
1078     public void testGetIdentifier() {
1079         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
1080         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1081                 TransactionProxy.TransactionType.READ_ONLY);
1082
1083         Object id = transactionProxy.getIdentifier();
1084         assertNotNull("getIdentifier returned null", id);
1085         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
1086     }
1087
1088     @Test
1089     public void testClose() throws Exception{
1090         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1091
1092         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
1093                 eq(actorSelection(actorRef)), eqSerializedReadData());
1094
1095         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1096                 READ_WRITE);
1097
1098         transactionProxy.read(TestModel.TEST_PATH);
1099
1100         transactionProxy.close();
1101
1102         verify(mockActorContext).sendOperationAsync(
1103                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
1104     }
1105
1106
1107     /**
1108      * Method to test a local Tx actor. The Tx paths are matched to decide if the
1109      * Tx actor is local or not. This is done by mocking the Tx actor path
1110      * and the caller paths and ensuring that the paths have the remote-address format
1111      *
1112      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
1113      * the paths returned for the actors for all the tests are not qualified remote paths.
1114      * Hence are treated as non-local/remote actors. In short, all tests except
1115      * few below run for remote actors
1116      *
1117      * @throws Exception
1118      */
1119     @Test
1120     public void testLocalTxActorRead() throws Exception {
1121         ActorSystem actorSystem = getSystem();
1122         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1123
1124         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1125             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1126
1127         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1128             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1129
1130         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1131         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1132             .setTransactionId("txn-1")
1133             .setTransactionActorPath(actorPath)
1134             .build();
1135
1136         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1137             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1138                 eqCreateTransaction(memberName, READ_ONLY));
1139
1140         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1141
1142         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
1143
1144         // negative test case with null as the reply
1145         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
1146             any(ActorSelection.class), eqReadData());
1147
1148         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1149             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1150
1151         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
1152
1153         // test case with node as read data reply
1154         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1155
1156         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1157             any(ActorSelection.class), eqReadData());
1158
1159         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1160
1161         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1162
1163         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
1164
1165         // test for local data exists
1166         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1167             any(ActorSelection.class), eqDataExists());
1168
1169         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1170
1171         assertEquals("Exists response", true, exists);
1172     }
1173
1174     @Test
1175     public void testLocalTxActorWrite() throws Exception {
1176         ActorSystem actorSystem = getSystem();
1177         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1178
1179         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1180             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1181
1182         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1183             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1184
1185         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1186         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1187             .setTransactionId("txn-1")
1188             .setTransactionActorPath(actorPath)
1189             .build();
1190
1191         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1192         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1193                 eqCreateTransaction(memberName, WRITE_ONLY));
1194
1195         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1196
1197         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1198
1199         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1200             any(ActorSelection.class), eqWriteData(nodeToWrite));
1201
1202         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
1203         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1204
1205         verify(mockActorContext).executeOperationAsync(
1206             any(ActorSelection.class), eqWriteData(nodeToWrite));
1207
1208         //testing local merge
1209         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1210             any(ActorSelection.class), eqMergeData(nodeToWrite));
1211
1212         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1213
1214         verify(mockActorContext).executeOperationAsync(
1215             any(ActorSelection.class), eqMergeData(nodeToWrite));
1216
1217
1218         //testing local delete
1219         doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1220             any(ActorSelection.class), eqDeleteData());
1221
1222         transactionProxy.delete(TestModel.TEST_PATH);
1223
1224         verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
1225
1226         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1227             WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
1228
1229         // testing ready
1230         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1231             any(ActorSelection.class), isA(ReadyTransaction.class));
1232
1233         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1234
1235         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1236
1237         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1238
1239         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
1240     }
1241
1242     private static interface TransactionProxyOperation {
1243         void run(TransactionProxy transactionProxy);
1244     }
1245
1246     private void throttleOperation(TransactionProxyOperation operation) {
1247         throttleOperation(operation, 1, true);
1248     }
1249
1250     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
1251         ActorSystem actorSystem = getSystem();
1252         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1253
1254         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
1255
1256         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1257                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1258
1259         if(shardFound) {
1260             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1261                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1262         } else {
1263             doReturn(Futures.failed(new Exception("not found")))
1264                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1265         }
1266
1267         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1268         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1269                 .setTransactionId("txn-1")
1270                 .setTransactionActorPath(actorPath)
1271                 .build();
1272
1273         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1274                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1275                         eqCreateTransaction(memberName, READ_WRITE));
1276
1277         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1278
1279         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1280
1281         long start = System.currentTimeMillis();
1282
1283         operation.run(transactionProxy);
1284
1285         long end = System.currentTimeMillis();
1286
1287         Assert.assertTrue(String.format("took less time than expected %s was %s",
1288                 mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
1289                 (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
1290
1291     }
1292
1293     private void completeOperation(TransactionProxyOperation operation){
1294         completeOperation(operation, true);
1295     }
1296
1297     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
1298         ActorSystem actorSystem = getSystem();
1299         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1300
1301         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
1302
1303         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1304                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1305
1306         if(shardFound) {
1307             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1308                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1309         } else {
1310             doReturn(Futures.failed(new Exception("not found")))
1311                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1312         }
1313
1314         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1315         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1316                 .setTransactionId("txn-1")
1317                 .setTransactionActorPath(actorPath)
1318                 .build();
1319
1320         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1321                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1322                         eqCreateTransaction(memberName, READ_WRITE));
1323
1324         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1325
1326         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1327
1328         long start = System.currentTimeMillis();
1329
1330         operation.run(transactionProxy);
1331
1332         long end = System.currentTimeMillis();
1333
1334         Assert.assertTrue(String.format("took more time than expected %s was %s",
1335                 mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
1336                 (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
1337     }
1338
1339     public void testWriteThrottling(boolean shardFound){
1340
1341         throttleOperation(new TransactionProxyOperation() {
1342             @Override
1343             public void run(TransactionProxy transactionProxy) {
1344                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1345
1346                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1347                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1348
1349                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1350
1351                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1352             }
1353         }, 1, shardFound);
1354     }
1355
1356     @Test
1357     public void testWriteThrottlingWhenShardFound(){
1358         throttleOperation(new TransactionProxyOperation() {
1359             @Override
1360             public void run(TransactionProxy transactionProxy) {
1361                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1362
1363                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1364                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1365
1366                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1367
1368                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1369             }
1370         });
1371
1372     }
1373
1374     @Test
1375     public void testWriteThrottlingWhenShardNotFound(){
1376         // Confirm that there is no throttling when the Shard is not found
1377         completeOperation(new TransactionProxyOperation() {
1378             @Override
1379             public void run(TransactionProxy transactionProxy) {
1380                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1381
1382                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1383                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1384
1385                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1386
1387                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1388             }
1389         }, false);
1390
1391     }
1392
1393
1394     @Test
1395     public void testWriteCompletion(){
1396         completeOperation(new TransactionProxyOperation() {
1397             @Override
1398             public void run(TransactionProxy transactionProxy) {
1399                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1400
1401                 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1402                         any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
1403
1404                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1405
1406                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1407             }
1408         });
1409
1410     }
1411
1412     @Test
1413     public void testMergeThrottlingWhenShardFound(){
1414
1415         throttleOperation(new TransactionProxyOperation() {
1416             @Override
1417             public void run(TransactionProxy transactionProxy) {
1418                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1419
1420                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1421                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1422
1423                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1424
1425                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1426             }
1427         });
1428     }
1429
1430     @Test
1431     public void testMergeThrottlingWhenShardNotFound(){
1432
1433         completeOperation(new TransactionProxyOperation() {
1434             @Override
1435             public void run(TransactionProxy transactionProxy) {
1436                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1437
1438                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1439                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1440
1441                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1442
1443                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1444             }
1445         }, false);
1446     }
1447
1448     @Test
1449     public void testMergeCompletion(){
1450         completeOperation(new TransactionProxyOperation() {
1451             @Override
1452             public void run(TransactionProxy transactionProxy) {
1453                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1454
1455                 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1456                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1457
1458                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1459
1460                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1461             }
1462         });
1463
1464     }
1465
1466     @Test
1467     public void testDeleteThrottlingWhenShardFound(){
1468
1469         throttleOperation(new TransactionProxyOperation() {
1470             @Override
1471             public void run(TransactionProxy transactionProxy) {
1472                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1473                         any(ActorSelection.class), eqDeleteData());
1474
1475                 transactionProxy.delete(TestModel.TEST_PATH);
1476
1477                 transactionProxy.delete(TestModel.TEST_PATH);
1478             }
1479         });
1480     }
1481
1482
1483     @Test
1484     public void testDeleteThrottlingWhenShardNotFound(){
1485
1486         completeOperation(new TransactionProxyOperation() {
1487             @Override
1488             public void run(TransactionProxy transactionProxy) {
1489                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1490                         any(ActorSelection.class), eqDeleteData());
1491
1492                 transactionProxy.delete(TestModel.TEST_PATH);
1493
1494                 transactionProxy.delete(TestModel.TEST_PATH);
1495             }
1496         }, false);
1497     }
1498
1499     @Test
1500     public void testDeleteCompletion(){
1501         completeOperation(new TransactionProxyOperation() {
1502             @Override
1503             public void run(TransactionProxy transactionProxy) {
1504                 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1505                         any(ActorSelection.class), eqDeleteData());
1506
1507                 transactionProxy.delete(TestModel.TEST_PATH);
1508
1509                 transactionProxy.delete(TestModel.TEST_PATH);
1510             }
1511         });
1512
1513     }
1514
1515     @Test
1516     public void testReadThrottlingWhenShardFound(){
1517
1518         throttleOperation(new TransactionProxyOperation() {
1519             @Override
1520             public void run(TransactionProxy transactionProxy) {
1521                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1522                         any(ActorSelection.class), eqReadData());
1523
1524                 transactionProxy.read(TestModel.TEST_PATH);
1525
1526                 transactionProxy.read(TestModel.TEST_PATH);
1527             }
1528         });
1529     }
1530
1531     @Test
1532     public void testReadThrottlingWhenShardNotFound(){
1533
1534         completeOperation(new TransactionProxyOperation() {
1535             @Override
1536             public void run(TransactionProxy transactionProxy) {
1537                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1538                         any(ActorSelection.class), eqReadData());
1539
1540                 transactionProxy.read(TestModel.TEST_PATH);
1541
1542                 transactionProxy.read(TestModel.TEST_PATH);
1543             }
1544         }, false);
1545     }
1546
1547
1548     @Test
1549     public void testReadCompletion(){
1550         completeOperation(new TransactionProxyOperation() {
1551             @Override
1552             public void run(TransactionProxy transactionProxy) {
1553                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1554
1555                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1556                         any(ActorSelection.class), eqReadData());
1557
1558                 transactionProxy.read(TestModel.TEST_PATH);
1559
1560                 transactionProxy.read(TestModel.TEST_PATH);
1561             }
1562         });
1563
1564     }
1565
1566     @Test
1567     public void testExistsThrottlingWhenShardFound(){
1568
1569         throttleOperation(new TransactionProxyOperation() {
1570             @Override
1571             public void run(TransactionProxy transactionProxy) {
1572                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1573                         any(ActorSelection.class), eqDataExists());
1574
1575                 transactionProxy.exists(TestModel.TEST_PATH);
1576
1577                 transactionProxy.exists(TestModel.TEST_PATH);
1578             }
1579         });
1580     }
1581
1582     @Test
1583     public void testExistsThrottlingWhenShardNotFound(){
1584
1585         completeOperation(new TransactionProxyOperation() {
1586             @Override
1587             public void run(TransactionProxy transactionProxy) {
1588                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1589                         any(ActorSelection.class), eqDataExists());
1590
1591                 transactionProxy.exists(TestModel.TEST_PATH);
1592
1593                 transactionProxy.exists(TestModel.TEST_PATH);
1594             }
1595         }, false);
1596     }
1597
1598
1599     @Test
1600     public void testExistsCompletion(){
1601         completeOperation(new TransactionProxyOperation() {
1602             @Override
1603             public void run(TransactionProxy transactionProxy) {
1604                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1605                         any(ActorSelection.class), eqDataExists());
1606
1607                 transactionProxy.exists(TestModel.TEST_PATH);
1608
1609                 transactionProxy.exists(TestModel.TEST_PATH);
1610             }
1611         });
1612
1613     }
1614
1615     @Test
1616     public void testReadyThrottling(){
1617
1618         throttleOperation(new TransactionProxyOperation() {
1619             @Override
1620             public void run(TransactionProxy transactionProxy) {
1621                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1622
1623                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1624                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1625
1626                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1627                         any(ActorSelection.class), any(ReadyTransaction.class));
1628
1629                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1630
1631                 transactionProxy.ready();
1632             }
1633         });
1634     }
1635
1636     @Test
1637     public void testReadyThrottlingWithTwoTransactionContexts(){
1638
1639         throttleOperation(new TransactionProxyOperation() {
1640             @Override
1641             public void run(TransactionProxy transactionProxy) {
1642                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1643                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1644
1645                 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1646                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1647
1648                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1649                         any(ActorSelection.class), eqWriteData(carsNode));
1650
1651                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1652                         any(ActorSelection.class), any(ReadyTransaction.class));
1653
1654                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1655
1656                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1657
1658                 transactionProxy.ready();
1659             }
1660         }, 2, true);
1661     }
1662 }