Merge "BUG 2518 : Throttle operations in a transaction"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.argThat;
10 import static org.mockito.Matchers.eq;
11 import static org.mockito.Matchers.isA;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
18 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSelection;
21 import akka.actor.ActorSystem;
22 import akka.actor.Props;
23 import akka.dispatch.Futures;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.typesafe.config.Config;
29 import com.typesafe.config.ConfigFactory;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.AfterClass;
34 import org.junit.Assert;
35 import org.junit.Before;
36 import org.junit.BeforeClass;
37 import org.junit.Test;
38 import org.mockito.ArgumentMatcher;
39 import org.mockito.Mock;
40 import org.mockito.Mockito;
41 import org.mockito.MockitoAnnotations;
42 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
43 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
44 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
45 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
48 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
50 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
51 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
52 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
53 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
54 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
55 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
58 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
59 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
60 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
61 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
62 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
63 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
64 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
67 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
68 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
71 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
72 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
73 import scala.concurrent.Await;
74 import scala.concurrent.Future;
75 import scala.concurrent.duration.Duration;
76
77 @SuppressWarnings("resource")
78 public class TransactionProxyTest {
79
80     @SuppressWarnings("serial")
81     static class TestException extends RuntimeException {
82     }
83
84     static interface Invoker {
85         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
86     }
87
88     private static ActorSystem system;
89
90     private final Configuration configuration = new MockConfiguration();
91
92     @Mock
93     private ActorContext mockActorContext;
94
95     private SchemaContext schemaContext;
96
97     @Mock
98     private ClusterWrapper mockClusterWrapper;
99
100     String memberName = "mock-member";
101
102     @BeforeClass
103     public static void setUpClass() throws IOException {
104
105         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
106                 put("akka.actor.default-dispatcher.type",
107                         "akka.testkit.CallingThreadDispatcherConfigurator").build()).
108                 withFallback(ConfigFactory.load());
109         system = ActorSystem.create("test", config);
110     }
111
112     @AfterClass
113     public static void tearDownClass() throws IOException {
114         JavaTestKit.shutdownActorSystem(system);
115         system = null;
116     }
117
118     @Before
119     public void setUp(){
120         MockitoAnnotations.initMocks(this);
121
122         schemaContext = TestModel.createTestContext();
123
124         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
125
126         doReturn(getSystem()).when(mockActorContext).getActorSystem();
127         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
128         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
129         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
130         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
131         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
132         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
133
134         ShardStrategyFactory.setConfiguration(configuration);
135     }
136
137     private ActorSystem getSystem() {
138         return system;
139     }
140
141     private CreateTransaction eqCreateTransaction(final String memberName,
142             final TransactionType type) {
143         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
144             @Override
145             public boolean matches(Object argument) {
146                 if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
147                     CreateTransaction obj = CreateTransaction.fromSerializable(argument);
148                     return obj.getTransactionId().startsWith(memberName) &&
149                             obj.getTransactionType() == type.ordinal();
150                 }
151
152                 return false;
153             }
154         };
155
156         return argThat(matcher);
157     }
158
159     private DataExists eqSerializedDataExists() {
160         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
161             @Override
162             public boolean matches(Object argument) {
163                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
164                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
165             }
166         };
167
168         return argThat(matcher);
169     }
170
171     private DataExists eqDataExists() {
172         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
173             @Override
174             public boolean matches(Object argument) {
175                 return (argument instanceof DataExists) &&
176                     ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
177             }
178         };
179
180         return argThat(matcher);
181     }
182
183     private ReadData eqSerializedReadData() {
184         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
185             @Override
186             public boolean matches(Object argument) {
187                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
188                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
189             }
190         };
191
192         return argThat(matcher);
193     }
194
195     private ReadData eqReadData() {
196         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
197             @Override
198             public boolean matches(Object argument) {
199                 return (argument instanceof ReadData) &&
200                     ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
201             }
202         };
203
204         return argThat(matcher);
205     }
206
207     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
208         return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
209     }
210
211     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
212             final int transactionVersion) {
213         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
214             @Override
215             public boolean matches(Object argument) {
216                 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
217                         WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
218                    (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
219                            ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
220
221                     WriteData obj = WriteData.fromSerializable(argument);
222                     return obj.getPath().equals(TestModel.TEST_PATH) &&
223                            obj.getData().equals(nodeToWrite);
224                 }
225
226                 return false;
227             }
228         };
229
230         return argThat(matcher);
231     }
232
233     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
234         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
235             @Override
236             public boolean matches(Object argument) {
237                 if(argument instanceof WriteData) {
238                     WriteData obj = (WriteData) argument;
239                     return obj.getPath().equals(TestModel.TEST_PATH) &&
240                         obj.getData().equals(nodeToWrite);
241                 }
242                 return false;
243             }
244         };
245
246         return argThat(matcher);
247     }
248
249     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
250         return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
251     }
252
253     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
254             final int transactionVersion) {
255         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
256             @Override
257             public boolean matches(Object argument) {
258                 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
259                         MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
260                    (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
261                            ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
262
263                     MergeData obj = MergeData.fromSerializable(argument);
264                     return obj.getPath().equals(TestModel.TEST_PATH) &&
265                            obj.getData().equals(nodeToWrite);
266                 }
267
268                 return false;
269             }
270         };
271
272         return argThat(matcher);
273     }
274
275     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
276         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
277             @Override
278             public boolean matches(Object argument) {
279                 if(argument instanceof MergeData) {
280                     MergeData obj = ((MergeData) argument);
281                     return obj.getPath().equals(TestModel.TEST_PATH) &&
282                         obj.getData().equals(nodeToWrite);
283                 }
284
285                return false;
286             }
287         };
288
289         return argThat(matcher);
290     }
291
292     private DeleteData eqSerializedDeleteData() {
293         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
294             @Override
295             public boolean matches(Object argument) {
296                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
297                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
298             }
299         };
300
301         return argThat(matcher);
302     }
303
304         private DeleteData eqDeleteData() {
305         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
306             @Override
307             public boolean matches(Object argument) {
308                 return argument instanceof DeleteData &&
309                     ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
310             }
311         };
312
313         return argThat(matcher);
314     }
315
316     private Future<Object> readySerializedTxReply(String path) {
317         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
318     }
319
320     private Future<Object> readyTxReply(String path) {
321         return Futures.successful((Object)new ReadyTransactionReply(path));
322     }
323
324     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
325             short transactionVersion) {
326         return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
327     }
328
329     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
330         return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
331     }
332
333     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
334         return Futures.successful(new ReadDataReply(data));
335     }
336
337     private Future<Object> dataExistsSerializedReply(boolean exists) {
338         return Futures.successful(new DataExistsReply(exists).toSerializable());
339     }
340
341     private Future<DataExistsReply> dataExistsReply(boolean exists) {
342         return Futures.successful(new DataExistsReply(exists));
343     }
344
345     private Future<Object> writeSerializedDataReply(short version) {
346         return Futures.successful(new WriteDataReply().toSerializable(version));
347     }
348
349     private Future<Object> writeSerializedDataReply() {
350         return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
351     }
352
353     private Future<WriteDataReply> writeDataReply() {
354         return Futures.successful(new WriteDataReply());
355     }
356
357     private Future<Object> mergeSerializedDataReply(short version) {
358         return Futures.successful(new MergeDataReply().toSerializable(version));
359     }
360
361     private Future<Object> mergeSerializedDataReply() {
362         return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
363     }
364
365     private Future<Object> incompleteFuture(){
366         return mock(Future.class);
367     }
368
369     private Future<MergeDataReply> mergeDataReply() {
370         return Futures.successful(new MergeDataReply());
371     }
372
373     private Future<Object> deleteSerializedDataReply() {
374         return Futures.successful(new DeleteDataReply().toSerializable());
375     }
376
377     private Future<DeleteDataReply> deleteDataReply() {
378         return Futures.successful(new DeleteDataReply());
379     }
380
381     private ActorSelection actorSelection(ActorRef actorRef) {
382         return getSystem().actorSelection(actorRef.path());
383     }
384
385     private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
386         return CreateTransactionReply.newBuilder()
387             .setTransactionActorPath(actorRef.path().toString())
388             .setTransactionId("txn-1")
389             .setMessageVersion(transactionVersion)
390             .build();
391     }
392
393     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
394             TransactionType type, int transactionVersion) {
395         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
396         doReturn(actorSystem.actorSelection(actorRef.path())).
397                 when(mockActorContext).actorSelection(actorRef.path().toString());
398
399         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
400                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
401
402         doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
403                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
404                         eqCreateTransaction(memberName, type));
405
406         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
407
408         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
409
410         return actorRef;
411     }
412
413     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
414         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
415     }
416
417
418     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
419             throws Throwable {
420
421         try {
422             future.checkedGet(5, TimeUnit.SECONDS);
423             fail("Expected ReadFailedException");
424         } catch(ReadFailedException e) {
425             throw e.getCause();
426         }
427     }
428
429     @Test
430     public void testRead() throws Exception {
431         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
432
433         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
434                 READ_ONLY);
435
436         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
437                 eq(actorSelection(actorRef)), eqSerializedReadData());
438
439         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
440                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
441
442         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
443
444         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
445
446         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
447                 eq(actorSelection(actorRef)), eqSerializedReadData());
448
449         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
450
451         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
452
453         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
454     }
455
456     @Test(expected = ReadFailedException.class)
457     public void testReadWithInvalidReplyMessageType() throws Exception {
458         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
459
460         doReturn(Futures.successful(new Object())).when(mockActorContext).
461                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
462
463         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
464                 READ_ONLY);
465
466         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
467     }
468
469     @Test(expected = TestException.class)
470     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
471         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
472
473         doReturn(Futures.failed(new TestException())).when(mockActorContext).
474                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
475
476         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
477                 READ_ONLY);
478
479         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
480     }
481
482     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
483             throws Throwable {
484         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
485
486         if (exToThrow instanceof PrimaryNotFoundException) {
487             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
488         } else {
489             doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
490                     when(mockActorContext).findPrimaryShardAsync(anyString());
491         }
492
493         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
494                 any(ActorSelection.class), any());
495
496         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
497
498         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
499     }
500
501     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
502         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
503             @Override
504             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
505                 return proxy.read(TestModel.TEST_PATH);
506             }
507         });
508     }
509
510     @Test(expected = PrimaryNotFoundException.class)
511     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
512         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
513     }
514
515     @Test(expected = TimeoutException.class)
516     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
517         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
518                 new Exception("reason")));
519     }
520
521     @Test(expected = TestException.class)
522     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
523         testReadWithExceptionOnInitialCreateTransaction(new TestException());
524     }
525
526     @Test(expected = TestException.class)
527     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
528         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
529
530         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
531
532         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
533                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
534
535         doReturn(Futures.failed(new TestException())).when(mockActorContext).
536                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
537
538         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
539                 eq(actorSelection(actorRef)), eqSerializedReadData());
540
541         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
542                 READ_WRITE);
543
544         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
545
546         transactionProxy.delete(TestModel.TEST_PATH);
547
548         try {
549             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
550         } finally {
551             verify(mockActorContext, times(0)).executeOperationAsync(
552                     eq(actorSelection(actorRef)), eqSerializedReadData());
553         }
554     }
555
556     @Test
557     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
558         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
559
560         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
561
562         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
563                 eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
564
565         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
566                 eq(actorSelection(actorRef)), eqSerializedReadData());
567
568         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
569                 READ_WRITE);
570
571         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
572
573         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
574                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
575
576         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
577
578         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
579     }
580
581     @Test(expected=IllegalStateException.class)
582     public void testReadPreConditionCheck() {
583
584         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
585                 WRITE_ONLY);
586
587         transactionProxy.read(TestModel.TEST_PATH);
588     }
589
590     @Test(expected=IllegalArgumentException.class)
591     public void testInvalidCreateTransactionReply() throws Throwable {
592         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
593
594         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
595             actorSelection(actorRef.path().toString());
596
597         doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
598             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
599
600         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
601             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
602
603         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
604
605         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
606     }
607
608     @Test
609     public void testExists() throws Exception {
610         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
611
612         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
613                 READ_ONLY);
614
615         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
616                 eq(actorSelection(actorRef)), eqSerializedDataExists());
617
618         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
619
620         assertEquals("Exists response", false, exists);
621
622         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
623                 eq(actorSelection(actorRef)), eqSerializedDataExists());
624
625         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
626
627         assertEquals("Exists response", true, exists);
628     }
629
630     @Test(expected = PrimaryNotFoundException.class)
631     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
632         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
633             @Override
634             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
635                 return proxy.exists(TestModel.TEST_PATH);
636             }
637         });
638     }
639
640     @Test(expected = ReadFailedException.class)
641     public void testExistsWithInvalidReplyMessageType() throws Exception {
642         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
643
644         doReturn(Futures.successful(new Object())).when(mockActorContext).
645                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
646
647         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
648                 READ_ONLY);
649
650         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
651     }
652
653     @Test(expected = TestException.class)
654     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
655         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
656
657         doReturn(Futures.failed(new TestException())).when(mockActorContext).
658                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
659
660         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
661                 READ_ONLY);
662
663         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
664     }
665
666     @Test(expected = TestException.class)
667     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
668         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
669
670         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
671
672         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
673                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
674
675         doReturn(Futures.failed(new TestException())).when(mockActorContext).
676                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
677
678         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
679                 eq(actorSelection(actorRef)), eqSerializedDataExists());
680
681         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
682                 READ_WRITE);
683
684         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
685
686         transactionProxy.delete(TestModel.TEST_PATH);
687
688         try {
689             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
690         } finally {
691             verify(mockActorContext, times(0)).executeOperationAsync(
692                     eq(actorSelection(actorRef)), eqSerializedDataExists());
693         }
694     }
695
696     @Test
697     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
698         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
699
700         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
701
702         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
703                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
704
705         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
706                 eq(actorSelection(actorRef)), eqSerializedDataExists());
707
708         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
709                 READ_WRITE);
710
711         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
712
713         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
714
715         assertEquals("Exists response", true, exists);
716     }
717
718     @Test(expected=IllegalStateException.class)
719     public void testExistsPreConditionCheck() {
720
721         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
722                 WRITE_ONLY);
723
724         transactionProxy.exists(TestModel.TEST_PATH);
725     }
726
727     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
728             Class<?>... expResultTypes) throws Exception {
729         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
730
731         int i = 0;
732         for( Future<Object> future: futures) {
733             assertNotNull("Recording operation Future is null", future);
734
735             Class<?> expResultType = expResultTypes[i++];
736             if(Throwable.class.isAssignableFrom(expResultType)) {
737                 try {
738                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
739                     fail("Expected exception from recording operation Future");
740                 } catch(Exception e) {
741                     // Expected
742                 }
743             } else {
744                 assertEquals("Recording operation Future result type", expResultType,
745                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
746             }
747         }
748     }
749
750     @Test
751     public void testWrite() throws Exception {
752         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
753
754         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
755
756         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
757                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
758
759         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
760                 WRITE_ONLY);
761
762         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
763
764         verify(mockActorContext).executeOperationAsync(
765                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
766
767         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
768                 WriteDataReply.class);
769     }
770
771     @Test(expected=IllegalStateException.class)
772     public void testWritePreConditionCheck() {
773
774         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
775                 READ_ONLY);
776
777         transactionProxy.write(TestModel.TEST_PATH,
778                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
779     }
780
781     @Test(expected=IllegalStateException.class)
782     public void testWriteAfterReadyPreConditionCheck() {
783
784         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
785                 WRITE_ONLY);
786
787         transactionProxy.ready();
788
789         transactionProxy.write(TestModel.TEST_PATH,
790                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
791     }
792
793     @Test
794     public void testMerge() throws Exception {
795         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
796
797         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
798
799         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
800                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
801
802         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
803
804         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
805
806         verify(mockActorContext).executeOperationAsync(
807                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
808
809         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
810                 MergeDataReply.class);
811     }
812
813     @Test
814     public void testDelete() throws Exception {
815         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
816
817         doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
818                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
819
820         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
821                 WRITE_ONLY);
822
823         transactionProxy.delete(TestModel.TEST_PATH);
824
825         verify(mockActorContext).executeOperationAsync(
826                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
827
828         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
829                 DeleteDataReply.SERIALIZABLE_CLASS);
830     }
831
832     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
833         Object... expReplies) throws Exception {
834         assertEquals("getReadyOperationFutures size", expReplies.length,
835                 proxy.getCohortFutures().size());
836
837         int i = 0;
838         for( Future<ActorSelection> future: proxy.getCohortFutures()) {
839             assertNotNull("Ready operation Future is null", future);
840
841             Object expReply = expReplies[i++];
842             if(expReply instanceof ActorSelection) {
843                 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
844                 assertEquals("Cohort actor path", expReply, actual);
845             } else {
846                 // Expecting exception.
847                 try {
848                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
849                     fail("Expected exception from ready operation Future");
850                 } catch(Exception e) {
851                     // Expected
852                 }
853             }
854         }
855     }
856
857     @Test
858     public void testReady() throws Exception {
859         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
860
861         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
862
863         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
864                 eq(actorSelection(actorRef)), eqSerializedReadData());
865
866         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
867                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
868
869         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
870                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
871
872         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
873                 READ_WRITE);
874
875         transactionProxy.read(TestModel.TEST_PATH);
876
877         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
878
879         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
880
881         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
882
883         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
884
885         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
886                 WriteDataReply.class);
887
888         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
889     }
890
891     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
892         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
893                 READ_WRITE, version);
894
895         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
896
897         doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
898                 eq(actorSelection(actorRef)), eqSerializedReadData());
899
900         doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
901                 eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
902
903         doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
904                 eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
905
906         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
907                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
908
909         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
910                 eq(actorRef.path().toString()));
911
912         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
913
914         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
915                 get(5, TimeUnit.SECONDS);
916
917         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
918         assertEquals("Response NormalizedNode", testNode, readOptional.get());
919
920         transactionProxy.write(TestModel.TEST_PATH, testNode);
921
922         transactionProxy.merge(TestModel.TEST_PATH, testNode);
923
924         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
925
926         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
927
928         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
929
930         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
931                 ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
932
933         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
934
935         return actorRef;
936     }
937
938     @Test
939     public void testCompatibilityWithBaseHeliumVersion() throws Exception {
940         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
941
942         verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
943                 eq(actorRef.path().toString()));
944     }
945
946     @Test
947     public void testCompatibilityWithHeliumR1Version() throws Exception {
948         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
949
950         verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
951                 eq(actorRef.path().toString()));
952     }
953
954     @Test
955     public void testReadyWithRecordingOperationFailure() throws Exception {
956         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
957
958         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
959
960         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
961                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
962
963         doReturn(Futures.failed(new TestException())).when(mockActorContext).
964                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
965
966         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
967                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
968
969         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
970
971         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
972                 WRITE_ONLY);
973
974         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
975
976         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
977
978         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
979
980         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
981
982         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
983
984         verifyCohortFutures(proxy, TestException.class);
985
986         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
987                 MergeDataReply.class, TestException.class);
988     }
989
990     @Test
991     public void testReadyWithReplyFailure() throws Exception {
992         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
993
994         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
995
996         doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
997                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
998
999         doReturn(Futures.failed(new TestException())).when(mockActorContext).
1000                 executeOperationAsync(eq(actorSelection(actorRef)),
1001                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
1002
1003         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1004                 WRITE_ONLY);
1005
1006         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1007
1008         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1009
1010         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1011
1012         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1013
1014         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1015                 MergeDataReply.class);
1016
1017         verifyCohortFutures(proxy, TestException.class);
1018     }
1019
1020     @Test
1021     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
1022
1023         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
1024                 mockActorContext).findPrimaryShardAsync(anyString());
1025
1026         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1027                 WRITE_ONLY);
1028
1029         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1030
1031         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1032
1033         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1034
1035         transactionProxy.delete(TestModel.TEST_PATH);
1036
1037         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1038
1039         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1040
1041         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1042
1043         verifyCohortFutures(proxy, PrimaryNotFoundException.class);
1044     }
1045
1046     @Test
1047     public void testReadyWithInvalidReplyMessageType() throws Exception {
1048         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1049
1050         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1051
1052         doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1053                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1054
1055         doReturn(Futures.successful(new Object())).when(mockActorContext).
1056                 executeOperationAsync(eq(actorSelection(actorRef)),
1057                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
1058
1059         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1060                 WRITE_ONLY);
1061
1062         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1063
1064         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1065
1066         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1067
1068         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1069
1070         verifyCohortFutures(proxy, IllegalArgumentException.class);
1071     }
1072
1073     @Test
1074     public void testGetIdentifier() {
1075         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
1076         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1077                 TransactionProxy.TransactionType.READ_ONLY);
1078
1079         Object id = transactionProxy.getIdentifier();
1080         assertNotNull("getIdentifier returned null", id);
1081         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
1082     }
1083
1084     @Test
1085     public void testClose() throws Exception{
1086         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1087
1088         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
1089                 eq(actorSelection(actorRef)), eqSerializedReadData());
1090
1091         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1092                 READ_WRITE);
1093
1094         transactionProxy.read(TestModel.TEST_PATH);
1095
1096         transactionProxy.close();
1097
1098         verify(mockActorContext).sendOperationAsync(
1099                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
1100     }
1101
1102
1103     /**
1104      * Method to test a local Tx actor. The Tx paths are matched to decide if the
1105      * Tx actor is local or not. This is done by mocking the Tx actor path
1106      * and the caller paths and ensuring that the paths have the remote-address format
1107      *
1108      * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
1109      * the paths returned for the actors for all the tests are not qualified remote paths.
1110      * Hence are treated as non-local/remote actors. In short, all tests except
1111      * few below run for remote actors
1112      *
1113      * @throws Exception
1114      */
1115     @Test
1116     public void testLocalTxActorRead() throws Exception {
1117         ActorSystem actorSystem = getSystem();
1118         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1119
1120         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1121             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1122
1123         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1124             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1125
1126         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1127         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1128             .setTransactionId("txn-1")
1129             .setTransactionActorPath(actorPath)
1130             .build();
1131
1132         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1133             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1134                 eqCreateTransaction(memberName, READ_ONLY));
1135
1136         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1137
1138         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
1139
1140         // negative test case with null as the reply
1141         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
1142             any(ActorSelection.class), eqReadData());
1143
1144         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1145             TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1146
1147         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
1148
1149         // test case with node as read data reply
1150         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1151
1152         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1153             any(ActorSelection.class), eqReadData());
1154
1155         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1156
1157         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1158
1159         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
1160
1161         // test for local data exists
1162         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1163             any(ActorSelection.class), eqDataExists());
1164
1165         boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1166
1167         assertEquals("Exists response", true, exists);
1168     }
1169
1170     @Test
1171     public void testLocalTxActorWrite() throws Exception {
1172         ActorSystem actorSystem = getSystem();
1173         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1174
1175         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1176             when(mockActorContext).actorSelection(shardActorRef.path().toString());
1177
1178         doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1179             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1180
1181         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1182         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1183             .setTransactionId("txn-1")
1184             .setTransactionActorPath(actorPath)
1185             .build();
1186
1187         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1188         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1189                 eqCreateTransaction(memberName, WRITE_ONLY));
1190
1191         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1192
1193         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1194
1195         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1196             any(ActorSelection.class), eqWriteData(nodeToWrite));
1197
1198         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
1199         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1200
1201         verify(mockActorContext).executeOperationAsync(
1202             any(ActorSelection.class), eqWriteData(nodeToWrite));
1203
1204         //testing local merge
1205         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1206             any(ActorSelection.class), eqMergeData(nodeToWrite));
1207
1208         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1209
1210         verify(mockActorContext).executeOperationAsync(
1211             any(ActorSelection.class), eqMergeData(nodeToWrite));
1212
1213
1214         //testing local delete
1215         doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1216             any(ActorSelection.class), eqDeleteData());
1217
1218         transactionProxy.delete(TestModel.TEST_PATH);
1219
1220         verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
1221
1222         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1223             WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
1224
1225         // testing ready
1226         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1227             any(ActorSelection.class), isA(ReadyTransaction.class));
1228
1229         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1230
1231         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1232
1233         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1234
1235         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
1236     }
1237
1238     private static interface TransactionProxyOperation {
1239         void run(TransactionProxy transactionProxy);
1240     }
1241
1242     private void throttleOperation(TransactionProxyOperation operation) {
1243         throttleOperation(operation, 1, true);
1244     }
1245
1246     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
1247         ActorSystem actorSystem = getSystem();
1248         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1249
1250         doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
1251
1252         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1253                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1254
1255         if(shardFound) {
1256             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1257                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1258         } else {
1259             doReturn(Futures.failed(new Exception("not found")))
1260                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1261         }
1262
1263         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1264         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1265                 .setTransactionId("txn-1")
1266                 .setTransactionActorPath(actorPath)
1267                 .build();
1268
1269         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1270                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1271                         eqCreateTransaction(memberName, READ_WRITE));
1272
1273         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1274
1275         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1276
1277         long start = System.currentTimeMillis();
1278
1279         operation.run(transactionProxy);
1280
1281         long end = System.currentTimeMillis();
1282
1283         Assert.assertTrue(String.format("took less time than expected %s was %s",
1284                 mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
1285                 (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
1286
1287     }
1288
1289     private void completeOperation(TransactionProxyOperation operation){
1290         completeOperation(operation, true);
1291     }
1292
1293     private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
1294         ActorSystem actorSystem = getSystem();
1295         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1296
1297         doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
1298
1299         doReturn(actorSystem.actorSelection(shardActorRef.path())).
1300                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1301
1302         if(shardFound) {
1303             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1304                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1305         } else {
1306             doReturn(Futures.failed(new Exception("not found")))
1307                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1308         }
1309
1310         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1311         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1312                 .setTransactionId("txn-1")
1313                 .setTransactionActorPath(actorPath)
1314                 .build();
1315
1316         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1317                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1318                         eqCreateTransaction(memberName, READ_WRITE));
1319
1320         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1321
1322         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1323
1324         long start = System.currentTimeMillis();
1325
1326         operation.run(transactionProxy);
1327
1328         long end = System.currentTimeMillis();
1329
1330         Assert.assertTrue(String.format("took more time than expected %s was %s",
1331                 mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
1332                 (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
1333     }
1334
1335     public void testWriteThrottling(boolean shardFound){
1336
1337         throttleOperation(new TransactionProxyOperation() {
1338             @Override
1339             public void run(TransactionProxy transactionProxy) {
1340                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1341
1342                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1343                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1344
1345                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1346
1347                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1348             }
1349         }, 1, shardFound);
1350     }
1351
1352     @Test
1353     public void testWriteThrottlingWhenShardFound(){
1354         throttleOperation(new TransactionProxyOperation() {
1355             @Override
1356             public void run(TransactionProxy transactionProxy) {
1357                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1358
1359                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1360                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1361
1362                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1363
1364                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1365             }
1366         });
1367
1368     }
1369
1370     @Test
1371     public void testWriteThrottlingWhenShardNotFound(){
1372         // Confirm that there is no throttling when the Shard is not found
1373         completeOperation(new TransactionProxyOperation() {
1374             @Override
1375             public void run(TransactionProxy transactionProxy) {
1376                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1377
1378                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1379                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1380
1381                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1382
1383                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1384             }
1385         }, false);
1386
1387     }
1388
1389
1390     @Test
1391     public void testWriteCompletion(){
1392         completeOperation(new TransactionProxyOperation() {
1393             @Override
1394             public void run(TransactionProxy transactionProxy) {
1395                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1396
1397                 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1398                         any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
1399
1400                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1401
1402                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1403             }
1404         });
1405
1406     }
1407
1408     @Test
1409     public void testMergeThrottlingWhenShardFound(){
1410
1411         throttleOperation(new TransactionProxyOperation() {
1412             @Override
1413             public void run(TransactionProxy transactionProxy) {
1414                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1415
1416                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1417                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1418
1419                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1420
1421                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1422             }
1423         });
1424     }
1425
1426     @Test
1427     public void testMergeThrottlingWhenShardNotFound(){
1428
1429         completeOperation(new TransactionProxyOperation() {
1430             @Override
1431             public void run(TransactionProxy transactionProxy) {
1432                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1433
1434                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1435                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1436
1437                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1438
1439                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1440             }
1441         }, false);
1442     }
1443
1444     @Test
1445     public void testMergeCompletion(){
1446         completeOperation(new TransactionProxyOperation() {
1447             @Override
1448             public void run(TransactionProxy transactionProxy) {
1449                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1450
1451                 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1452                         any(ActorSelection.class), eqMergeData(nodeToMerge));
1453
1454                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1455
1456                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1457             }
1458         });
1459
1460     }
1461
1462     @Test
1463     public void testDeleteThrottlingWhenShardFound(){
1464
1465         throttleOperation(new TransactionProxyOperation() {
1466             @Override
1467             public void run(TransactionProxy transactionProxy) {
1468                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1469                         any(ActorSelection.class), eqDeleteData());
1470
1471                 transactionProxy.delete(TestModel.TEST_PATH);
1472
1473                 transactionProxy.delete(TestModel.TEST_PATH);
1474             }
1475         });
1476     }
1477
1478
1479     @Test
1480     public void testDeleteThrottlingWhenShardNotFound(){
1481
1482         completeOperation(new TransactionProxyOperation() {
1483             @Override
1484             public void run(TransactionProxy transactionProxy) {
1485                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1486                         any(ActorSelection.class), eqDeleteData());
1487
1488                 transactionProxy.delete(TestModel.TEST_PATH);
1489
1490                 transactionProxy.delete(TestModel.TEST_PATH);
1491             }
1492         }, false);
1493     }
1494
1495     @Test
1496     public void testDeleteCompletion(){
1497         completeOperation(new TransactionProxyOperation() {
1498             @Override
1499             public void run(TransactionProxy transactionProxy) {
1500                 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1501                         any(ActorSelection.class), eqDeleteData());
1502
1503                 transactionProxy.delete(TestModel.TEST_PATH);
1504
1505                 transactionProxy.delete(TestModel.TEST_PATH);
1506             }
1507         });
1508
1509     }
1510
1511     @Test
1512     public void testReadThrottlingWhenShardFound(){
1513
1514         throttleOperation(new TransactionProxyOperation() {
1515             @Override
1516             public void run(TransactionProxy transactionProxy) {
1517                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1518                         any(ActorSelection.class), eqReadData());
1519
1520                 transactionProxy.read(TestModel.TEST_PATH);
1521
1522                 transactionProxy.read(TestModel.TEST_PATH);
1523             }
1524         });
1525     }
1526
1527     @Test
1528     public void testReadThrottlingWhenShardNotFound(){
1529
1530         completeOperation(new TransactionProxyOperation() {
1531             @Override
1532             public void run(TransactionProxy transactionProxy) {
1533                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1534                         any(ActorSelection.class), eqReadData());
1535
1536                 transactionProxy.read(TestModel.TEST_PATH);
1537
1538                 transactionProxy.read(TestModel.TEST_PATH);
1539             }
1540         }, false);
1541     }
1542
1543
1544     @Test
1545     public void testReadCompletion(){
1546         completeOperation(new TransactionProxyOperation() {
1547             @Override
1548             public void run(TransactionProxy transactionProxy) {
1549                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1550
1551                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1552                         any(ActorSelection.class), eqReadData());
1553
1554                 transactionProxy.read(TestModel.TEST_PATH);
1555
1556                 transactionProxy.read(TestModel.TEST_PATH);
1557             }
1558         });
1559
1560     }
1561
1562     @Test
1563     public void testExistsThrottlingWhenShardFound(){
1564
1565         throttleOperation(new TransactionProxyOperation() {
1566             @Override
1567             public void run(TransactionProxy transactionProxy) {
1568                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1569                         any(ActorSelection.class), eqDataExists());
1570
1571                 transactionProxy.exists(TestModel.TEST_PATH);
1572
1573                 transactionProxy.exists(TestModel.TEST_PATH);
1574             }
1575         });
1576     }
1577
1578     @Test
1579     public void testExistsThrottlingWhenShardNotFound(){
1580
1581         completeOperation(new TransactionProxyOperation() {
1582             @Override
1583             public void run(TransactionProxy transactionProxy) {
1584                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1585                         any(ActorSelection.class), eqDataExists());
1586
1587                 transactionProxy.exists(TestModel.TEST_PATH);
1588
1589                 transactionProxy.exists(TestModel.TEST_PATH);
1590             }
1591         }, false);
1592     }
1593
1594
1595     @Test
1596     public void testExistsCompletion(){
1597         completeOperation(new TransactionProxyOperation() {
1598             @Override
1599             public void run(TransactionProxy transactionProxy) {
1600                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1601                         any(ActorSelection.class), eqDataExists());
1602
1603                 transactionProxy.exists(TestModel.TEST_PATH);
1604
1605                 transactionProxy.exists(TestModel.TEST_PATH);
1606             }
1607         });
1608
1609     }
1610
1611     @Test
1612     public void testReadyThrottling(){
1613
1614         throttleOperation(new TransactionProxyOperation() {
1615             @Override
1616             public void run(TransactionProxy transactionProxy) {
1617                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1618
1619                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1620                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1621
1622                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1623                         any(ActorSelection.class), any(ReadyTransaction.class));
1624
1625                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1626
1627                 transactionProxy.ready();
1628             }
1629         });
1630     }
1631
1632     @Test
1633     public void testReadyThrottlingWithTwoTransactionContexts(){
1634
1635         throttleOperation(new TransactionProxyOperation() {
1636             @Override
1637             public void run(TransactionProxy transactionProxy) {
1638                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1639                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1640
1641                 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1642                         any(ActorSelection.class), eqWriteData(nodeToWrite));
1643
1644                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1645                         any(ActorSelection.class), eqWriteData(carsNode));
1646
1647                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1648                         any(ActorSelection.class), any(ReadyTransaction.class));
1649
1650                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1651
1652                 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1653
1654                 transactionProxy.ready();
1655             }
1656         }, 2, true);
1657     }
1658 }