Merge "BUG-1493: activate recursion elision"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.fail;
7 import akka.actor.ActorPath;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.Props;
11 import akka.dispatch.Futures;
12
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15
16 import org.junit.Before;
17 import org.junit.Test;
18 import org.mockito.ArgumentMatcher;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
21
22 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
25
26 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
27 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
29 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
34 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
36 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
42 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
43 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
44 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
60
61 import java.util.List;
62 import java.util.concurrent.TimeUnit;
63
64 import static org.mockito.Matchers.any;
65 import static org.mockito.Matchers.anyString;
66 import static org.mockito.Mockito.doReturn;
67 import static org.mockito.Mockito.doThrow;
68 import static org.mockito.Mockito.argThat;
69 import static org.mockito.Mockito.eq;
70 import static org.mockito.Mockito.verify;
71 import static org.mockito.Mockito.isA;
72 import static org.mockito.Mockito.times;
73
74 @SuppressWarnings("resource")
75 public class TransactionProxyTest extends AbstractActorTest {
76
77     @SuppressWarnings("serial")
78     static class TestException extends RuntimeException {
79     }
80
81     static interface Invoker {
82         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
83     }
84
85     private final Configuration configuration = new MockConfiguration();
86
87     @Mock
88     private ActorContext mockActorContext;
89
90     private SchemaContext schemaContext;
91
92     String memberName = "mock-member";
93
94     @Before
95     public void setUp(){
96         MockitoAnnotations.initMocks(this);
97
98         schemaContext = TestModel.createTestContext();
99
100         doReturn(getSystem()).when(mockActorContext).getActorSystem();
101         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
102
103         ShardStrategyFactory.setConfiguration(configuration);
104     }
105
106     private CreateTransaction eqCreateTransaction(final String memberName,
107             final TransactionType type) {
108         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
109             @Override
110             public boolean matches(Object argument) {
111                 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
112                 return obj.getTransactionId().startsWith(memberName) &&
113                        obj.getTransactionType() == type.ordinal();
114             }
115         };
116
117         return argThat(matcher);
118     }
119
120     private DataExists eqDataExists() {
121         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
122             @Override
123             public boolean matches(Object argument) {
124                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
125                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
126             }
127         };
128
129         return argThat(matcher);
130     }
131
132     private ReadData eqReadData() {
133         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
134             @Override
135             public boolean matches(Object argument) {
136                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
137                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
138             }
139         };
140
141         return argThat(matcher);
142     }
143
144     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
145         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
146             @Override
147             public boolean matches(Object argument) {
148                 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
149                     return false;
150                 }
151
152                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
153                 return obj.getPath().equals(TestModel.TEST_PATH) &&
154                        obj.getData().equals(nodeToWrite);
155             }
156         };
157
158         return argThat(matcher);
159     }
160
161     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
162         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
163             @Override
164             public boolean matches(Object argument) {
165                 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
166                     return false;
167                 }
168
169                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
170                 return obj.getPath().equals(TestModel.TEST_PATH) &&
171                        obj.getData().equals(nodeToWrite);
172             }
173         };
174
175         return argThat(matcher);
176     }
177
178     private DeleteData eqDeleteData() {
179         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
180             @Override
181             public boolean matches(Object argument) {
182                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
183                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
184             }
185         };
186
187         return argThat(matcher);
188     }
189
190     private Future<Object> readyTxReply(ActorPath path) {
191         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
192     }
193
194     private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
195         return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
196     }
197
198     private Future<Object> dataExistsReply(boolean exists) {
199         return Futures.successful(new DataExistsReply(exists).toSerializable());
200     }
201
202     private Future<Object> writeDataReply() {
203         return Futures.successful(new WriteDataReply().toSerializable());
204     }
205
206     private Future<Object> mergeDataReply() {
207         return Futures.successful(new MergeDataReply().toSerializable());
208     }
209
210     private Future<Object> deleteDataReply() {
211         return Futures.successful(new DeleteDataReply().toSerializable());
212     }
213
214     private ActorSelection actorSelection(ActorRef actorRef) {
215         return getSystem().actorSelection(actorRef.path());
216     }
217
218     private FiniteDuration anyDuration() {
219         return any(FiniteDuration.class);
220     }
221
222     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
223         return CreateTransactionReply.newBuilder()
224             .setTransactionActorPath(actorRef.path().toString())
225             .setTransactionId("txn-1").build();
226     }
227
228     private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
229         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
230         doReturn(getSystem().actorSelection(actorRef.path())).
231                 when(mockActorContext).actorSelection(actorRef.path().toString());
232         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
233                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
234                         eqCreateTransaction(memberName, type), anyDuration());
235         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
236                 anyString(), eq(actorRef.path().toString()));
237         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
238
239         return actorRef;
240     }
241
242     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
243             throws Throwable {
244
245         try {
246             future.checkedGet(5, TimeUnit.SECONDS);
247             fail("Expected ReadFailedException");
248         } catch(ReadFailedException e) {
249             throw e.getCause();
250         }
251     }
252
253     @Test
254     public void testRead() throws Exception {
255         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
256
257         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
258                 READ_ONLY, schemaContext);
259
260         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
261                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
262
263         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
264                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
265
266         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
267
268         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
269
270         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
271                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
272
273         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
274
275         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
276
277         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
278     }
279
280     @Test(expected = ReadFailedException.class)
281     public void testReadWithInvalidReplyMessageType() throws Exception {
282         setupActorContextWithInitialCreateTransaction(READ_ONLY);
283
284         doReturn(Futures.successful(new Object())).when(mockActorContext).
285                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
286
287         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
288                 READ_ONLY, schemaContext);
289
290         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
291     }
292
293     @Test(expected = TestException.class)
294     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
295         setupActorContextWithInitialCreateTransaction(READ_ONLY);
296
297         doReturn(Futures.failed(new TestException())).when(mockActorContext).
298                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
299
300         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
301                 READ_ONLY, schemaContext);
302
303         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
304     }
305
306     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
307             throws Throwable {
308
309         doThrow(exToThrow).when(mockActorContext).executeShardOperation(
310                 anyString(), any(), anyDuration());
311
312         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
313                 READ_ONLY, schemaContext);
314
315         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
316     }
317
318     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
319         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
320             @Override
321             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
322                 return proxy.read(TestModel.TEST_PATH);
323             }
324         });
325     }
326
327     @Test(expected = PrimaryNotFoundException.class)
328     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
329         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
330     }
331
332     @Test(expected = TimeoutException.class)
333     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
334         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
335                 new Exception("reason")));
336     }
337
338     @Test(expected = TestException.class)
339     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
340         testReadWithExceptionOnInitialCreateTransaction(new TestException());
341     }
342
343     @Test(expected = TestException.class)
344     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
345         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
346
347         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
348
349         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
350                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
351
352         doReturn(Futures.failed(new TestException())).when(mockActorContext).
353                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
354                         anyDuration());
355
356         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
357                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
358
359         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
360                 READ_WRITE, schemaContext);
361
362         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
363
364         transactionProxy.delete(TestModel.TEST_PATH);
365
366         try {
367             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
368         } finally {
369             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
370                     eq(actorSelection(actorRef)), eqReadData(), anyDuration());
371         }
372     }
373
374     @Test
375     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
376         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
377
378         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
379
380         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
381                 eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
382
383         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
384                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
385
386         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
387                 READ_WRITE, schemaContext);
388
389         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
390
391         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
392                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
393
394         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
395
396         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
397     }
398
399     @Test(expected=IllegalStateException.class)
400     public void testReadPreConditionCheck() {
401
402         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
403                 WRITE_ONLY, schemaContext);
404
405         transactionProxy.read(TestModel.TEST_PATH);
406     }
407
408     @Test
409     public void testExists() throws Exception {
410         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
411
412         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
413                 READ_ONLY, schemaContext);
414
415         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
416                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
417
418         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
419
420         assertEquals("Exists response", false, exists);
421
422         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
423                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
424
425         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
426
427         assertEquals("Exists response", true, exists);
428     }
429
430     @Test(expected = PrimaryNotFoundException.class)
431     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
432         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
433             @Override
434             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
435                 return proxy.exists(TestModel.TEST_PATH);
436             }
437         });
438     }
439
440     @Test(expected = ReadFailedException.class)
441     public void testExistsWithInvalidReplyMessageType() throws Exception {
442         setupActorContextWithInitialCreateTransaction(READ_ONLY);
443
444         doReturn(Futures.successful(new Object())).when(mockActorContext).
445                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
446
447         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
448                 READ_ONLY, schemaContext);
449
450         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
451     }
452
453     @Test(expected = TestException.class)
454     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
455         setupActorContextWithInitialCreateTransaction(READ_ONLY);
456
457         doReturn(Futures.failed(new TestException())).when(mockActorContext).
458                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
459
460         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
461                 READ_ONLY, schemaContext);
462
463         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
464     }
465
466     @Test(expected = TestException.class)
467     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
468         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
469
470         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
471
472         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
473                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
474
475         doReturn(Futures.failed(new TestException())).when(mockActorContext).
476                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
477                         anyDuration());
478
479         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
480                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
481
482         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
483                 READ_WRITE, schemaContext);
484
485         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
486
487         transactionProxy.delete(TestModel.TEST_PATH);
488
489         try {
490             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
491         } finally {
492             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
493                     eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
494         }
495     }
496
497     @Test
498     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
499         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
500
501         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
502
503         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
504                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
505
506         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
507                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
508
509         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
510                 READ_WRITE, schemaContext);
511
512         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
513
514         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
515
516         assertEquals("Exists response", true, exists);
517     }
518
519     @Test(expected=IllegalStateException.class)
520     public void testxistsPreConditionCheck() {
521
522         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
523                 WRITE_ONLY, schemaContext);
524
525         transactionProxy.exists(TestModel.TEST_PATH);
526     }
527
528     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
529             Class<?>... expResultTypes) throws Exception {
530         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
531
532         int i = 0;
533         for( Future<Object> future: futures) {
534             assertNotNull("Recording operation Future is null", future);
535
536             Class<?> expResultType = expResultTypes[i++];
537             if(Throwable.class.isAssignableFrom(expResultType)) {
538                 try {
539                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
540                     fail("Expected exception from recording operation Future");
541                 } catch(Exception e) {
542                     // Expected
543                 }
544             } else {
545                 assertEquals("Recording operation Future result type", expResultType,
546                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
547             }
548         }
549     }
550
551     @Test
552     public void testWrite() throws Exception {
553         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
554
555         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
556
557         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
558                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
559
560         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
561                 WRITE_ONLY, schemaContext);
562
563         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
564
565         verify(mockActorContext).executeRemoteOperationAsync(
566                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
567
568         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
569                 WriteDataReply.SERIALIZABLE_CLASS);
570     }
571
572     @Test(expected=IllegalStateException.class)
573     public void testWritePreConditionCheck() {
574
575         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
576                 READ_ONLY, schemaContext);
577
578         transactionProxy.write(TestModel.TEST_PATH,
579                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
580     }
581
582     @Test(expected=IllegalStateException.class)
583     public void testWriteAfterReadyPreConditionCheck() {
584
585         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
586                 WRITE_ONLY, schemaContext);
587
588         transactionProxy.ready();
589
590         transactionProxy.write(TestModel.TEST_PATH,
591                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
592     }
593
594     @Test
595     public void testMerge() throws Exception {
596         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
597
598         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
599
600         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
601                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
602
603         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
604                 WRITE_ONLY, schemaContext);
605
606         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
607
608         verify(mockActorContext).executeRemoteOperationAsync(
609                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
610
611         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
612                 MergeDataReply.SERIALIZABLE_CLASS);
613     }
614
615     @Test
616     public void testDelete() throws Exception {
617         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
618
619         doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
620                 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
621
622         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
623                 WRITE_ONLY, schemaContext);
624
625         transactionProxy.delete(TestModel.TEST_PATH);
626
627         verify(mockActorContext).executeRemoteOperationAsync(
628                 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
629
630         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
631                 DeleteDataReply.SERIALIZABLE_CLASS);
632     }
633
634     private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
635             Object... expReplies) throws Exception {
636         assertEquals("getReadyOperationFutures size", expReplies.length,
637                 proxy.getCohortPathFutures().size());
638
639         int i = 0;
640         for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
641             assertNotNull("Ready operation Future is null", future);
642
643             Object expReply = expReplies[i++];
644             if(expReply instanceof ActorPath) {
645                 ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
646                 assertEquals("Cohort actor path", expReply, actual);
647             } else {
648                 // Expecting exception.
649                 try {
650                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
651                     fail("Expected exception from ready operation Future");
652                 } catch(Exception e) {
653                     // Expected
654                 }
655             }
656         }
657     }
658
659     @SuppressWarnings("unchecked")
660     @Test
661     public void testReady() throws Exception {
662         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
663
664         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
665
666         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
667                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
668
669         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
670                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
671
672         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
673                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
674
675         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
676                 READ_WRITE, schemaContext);
677
678         transactionProxy.read(TestModel.TEST_PATH);
679
680         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
681
682         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
683
684         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
685
686         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
687
688         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
689                 WriteDataReply.SERIALIZABLE_CLASS);
690
691         verifyCohortPathFutures(proxy, actorRef.path());
692     }
693
694     @SuppressWarnings("unchecked")
695     @Test
696     public void testReadyWithRecordingOperationFailure() throws Exception {
697         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
698
699         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
700
701         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
702                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
703
704         doReturn(Futures.failed(new TestException())).when(mockActorContext).
705                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
706                         anyDuration());
707
708         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
709                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
710
711         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
712                 WRITE_ONLY, schemaContext);
713
714         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
715
716         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
717
718         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
719
720         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
721
722         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
723
724         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
725                 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
726
727         verifyCohortPathFutures(proxy, TestException.class);
728     }
729
730     @SuppressWarnings("unchecked")
731     @Test
732     public void testReadyWithReplyFailure() throws Exception {
733         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
734
735         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
736
737         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
738                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
739
740         doReturn(Futures.failed(new TestException())).when(mockActorContext).
741                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
742                         isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
743
744         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
745                 WRITE_ONLY, schemaContext);
746
747         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
748
749         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
750
751         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
752
753         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
754
755         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
756                 MergeDataReply.SERIALIZABLE_CLASS);
757
758         verifyCohortPathFutures(proxy, TestException.class);
759     }
760
761     @Test
762     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
763
764         doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
765                 anyString(), any(), anyDuration());
766
767         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
768                 WRITE_ONLY, schemaContext);
769
770         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
771
772         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
773
774         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
775
776         transactionProxy.delete(TestModel.TEST_PATH);
777
778         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
779
780         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
781
782         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
783
784         verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
785     }
786
787     @SuppressWarnings("unchecked")
788     @Test
789     public void testReadyWithInvalidReplyMessageType() throws Exception {
790         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
791
792         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
793
794         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
795                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
796
797         doReturn(Futures.successful(new Object())).when(mockActorContext).
798                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
799                         isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
800
801         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
802                 WRITE_ONLY, schemaContext);
803
804         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
805
806         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
807
808         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
809
810         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
811
812         verifyCohortPathFutures(proxy, IllegalArgumentException.class);
813     }
814
815     @Test
816     public void testGetIdentifier() {
817         setupActorContextWithInitialCreateTransaction(READ_ONLY);
818         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
819                 TransactionProxy.TransactionType.READ_ONLY, schemaContext);
820
821         Object id = transactionProxy.getIdentifier();
822         assertNotNull("getIdentifier returned null", id);
823         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
824     }
825
826     @SuppressWarnings("unchecked")
827     @Test
828     public void testClose() throws Exception{
829         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
830
831         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
832                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
833
834         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
835                 READ_WRITE, schemaContext);
836
837         transactionProxy.read(TestModel.TEST_PATH);
838
839         transactionProxy.close();
840
841         verify(mockActorContext).sendRemoteOperationAsync(
842                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
843     }
844 }