Merge "Issue fix for config subsystem"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import com.google.common.util.concurrent.CheckedFuture;
4
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSelection;
7 import akka.actor.Props;
8 import akka.dispatch.Futures;
9 import com.google.common.base.Optional;
10 import org.junit.Before;
11 import org.junit.Test;
12 import org.mockito.ArgumentMatcher;
13 import org.mockito.Mock;
14 import org.mockito.MockitoAnnotations;
15 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
16 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
21 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
22 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
23 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
25 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
31 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
32 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
36 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
37 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import scala.concurrent.Await;
45 import scala.concurrent.Future;
46 import scala.concurrent.duration.Duration;
47
48 import java.util.List;
49 import java.util.concurrent.TimeUnit;
50
51 import static org.junit.Assert.assertEquals;
52 import static org.junit.Assert.assertNotNull;
53 import static org.junit.Assert.assertTrue;
54 import static org.junit.Assert.fail;
55 import static org.mockito.Matchers.any;
56 import static org.mockito.Matchers.anyString;
57 import static org.mockito.Mockito.argThat;
58 import static org.mockito.Mockito.doReturn;
59 import static org.mockito.Mockito.doThrow;
60 import static org.mockito.Mockito.eq;
61 import static org.mockito.Mockito.isA;
62 import static org.mockito.Mockito.times;
63 import static org.mockito.Mockito.verify;
64 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
65 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
66 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
67
68 @SuppressWarnings("resource")
69 public class TransactionProxyTest extends AbstractActorTest {
70
71     @SuppressWarnings("serial")
72     static class TestException extends RuntimeException {
73     }
74
75     static interface Invoker {
76         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
77     }
78
79     private final Configuration configuration = new MockConfiguration();
80
81     @Mock
82     private ActorContext mockActorContext;
83
84     private SchemaContext schemaContext;
85
86     String memberName = "mock-member";
87
88     @Before
89     public void setUp(){
90         MockitoAnnotations.initMocks(this);
91
92         schemaContext = TestModel.createTestContext();
93
94         doReturn(getSystem()).when(mockActorContext).getActorSystem();
95         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
96         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
97
98         ShardStrategyFactory.setConfiguration(configuration);
99     }
100
101     private CreateTransaction eqCreateTransaction(final String memberName,
102             final TransactionType type) {
103         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
104             @Override
105             public boolean matches(Object argument) {
106                 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
107                 return obj.getTransactionId().startsWith(memberName) &&
108                        obj.getTransactionType() == type.ordinal();
109             }
110         };
111
112         return argThat(matcher);
113     }
114
115     private DataExists eqDataExists() {
116         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
117             @Override
118             public boolean matches(Object argument) {
119                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
120                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
121             }
122         };
123
124         return argThat(matcher);
125     }
126
127     private ReadData eqReadData() {
128         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
129             @Override
130             public boolean matches(Object argument) {
131                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
132                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
133             }
134         };
135
136         return argThat(matcher);
137     }
138
139     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
140         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
141             @Override
142             public boolean matches(Object argument) {
143                 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
144                     return false;
145                 }
146
147                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
148                 return obj.getPath().equals(TestModel.TEST_PATH) &&
149                        obj.getData().equals(nodeToWrite);
150             }
151         };
152
153         return argThat(matcher);
154     }
155
156     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
157         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
158             @Override
159             public boolean matches(Object argument) {
160                 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
161                     return false;
162                 }
163
164                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
165                 return obj.getPath().equals(TestModel.TEST_PATH) &&
166                        obj.getData().equals(nodeToWrite);
167             }
168         };
169
170         return argThat(matcher);
171     }
172
173     private DeleteData eqDeleteData() {
174         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
175             @Override
176             public boolean matches(Object argument) {
177                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
178                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
179             }
180         };
181
182         return argThat(matcher);
183     }
184
185     private Future<Object> readyTxReply(String path) {
186         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
187     }
188
189     private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
190         return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
191     }
192
193     private Future<Object> dataExistsReply(boolean exists) {
194         return Futures.successful(new DataExistsReply(exists).toSerializable());
195     }
196
197     private Future<Object> writeDataReply() {
198         return Futures.successful(new WriteDataReply().toSerializable());
199     }
200
201     private Future<Object> mergeDataReply() {
202         return Futures.successful(new MergeDataReply().toSerializable());
203     }
204
205     private Future<Object> deleteDataReply() {
206         return Futures.successful(new DeleteDataReply().toSerializable());
207     }
208
209     private ActorSelection actorSelection(ActorRef actorRef) {
210         return getSystem().actorSelection(actorRef.path());
211     }
212
213     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
214         return CreateTransactionReply.newBuilder()
215             .setTransactionActorPath(actorRef.path().toString())
216             .setTransactionId("txn-1").build();
217     }
218
219     private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
220         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
221         doReturn(getSystem().actorSelection(actorRef.path())).
222                 when(mockActorContext).actorSelection(actorRef.path().toString());
223
224         doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
225                 when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
226
227         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
228                 executeOperation(eq(getSystem().actorSelection(actorRef.path())),
229                         eqCreateTransaction(memberName, type));
230         return actorRef;
231     }
232
233     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
234             throws Throwable {
235
236         try {
237             future.checkedGet(5, TimeUnit.SECONDS);
238             fail("Expected ReadFailedException");
239         } catch(ReadFailedException e) {
240             throw e.getCause();
241         }
242     }
243
244     @Test
245     public void testRead() throws Exception {
246         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
247
248         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
249                 READ_ONLY);
250
251         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
252                 eq(actorSelection(actorRef)), eqReadData());
253
254         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
255                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
256
257         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
258
259         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
260
261         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
262                 eq(actorSelection(actorRef)), eqReadData());
263
264         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
265
266         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
267
268         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
269     }
270
271     @Test(expected = ReadFailedException.class)
272     public void testReadWithInvalidReplyMessageType() throws Exception {
273         setupActorContextWithInitialCreateTransaction(READ_ONLY);
274
275         doReturn(Futures.successful(new Object())).when(mockActorContext).
276                 executeOperationAsync(any(ActorSelection.class), any());
277
278         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
279                 READ_ONLY);
280
281         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
282     }
283
284     @Test(expected = TestException.class)
285     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
286         setupActorContextWithInitialCreateTransaction(READ_ONLY);
287
288         doReturn(Futures.failed(new TestException())).when(mockActorContext).
289                 executeOperationAsync(any(ActorSelection.class), any());
290
291         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
292                 READ_ONLY);
293
294         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
295     }
296
297     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
298             throws Throwable {
299         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
300
301         if (exToThrow instanceof PrimaryNotFoundException) {
302             doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
303         } else {
304             doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
305                     when(mockActorContext).findPrimaryShard(anyString());
306         }
307         doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
308
309         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
310
311         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
312     }
313
314     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
315         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
316             @Override
317             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
318                 return proxy.read(TestModel.TEST_PATH);
319             }
320         });
321     }
322
323     @Test(expected = PrimaryNotFoundException.class)
324     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
325         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
326     }
327
328     @Test(expected = TimeoutException.class)
329     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
330         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
331                 new Exception("reason")));
332     }
333
334     @Test(expected = TestException.class)
335     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
336         testReadWithExceptionOnInitialCreateTransaction(new TestException());
337     }
338
339     @Test(expected = TestException.class)
340     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
341         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
342
343         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
344
345         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
346                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
347
348         doReturn(Futures.failed(new TestException())).when(mockActorContext).
349                 executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
350
351         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
352                 eq(actorSelection(actorRef)), eqReadData());
353
354         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
355                 READ_WRITE);
356
357         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
358
359         transactionProxy.delete(TestModel.TEST_PATH);
360
361         try {
362             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
363         } finally {
364             verify(mockActorContext, times(0)).executeOperationAsync(
365                     eq(actorSelection(actorRef)), eqReadData());
366         }
367     }
368
369     @Test
370     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
371         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
372
373         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
374
375         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
376                 eq(actorSelection(actorRef)), eqWriteData(expectedNode));
377
378         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
379                 eq(actorSelection(actorRef)), eqReadData());
380
381         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
382                 READ_WRITE);
383
384         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
385
386         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
387                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
388
389         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
390
391         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
392     }
393
394     @Test(expected=IllegalStateException.class)
395     public void testReadPreConditionCheck() {
396
397         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
398                 WRITE_ONLY);
399
400         transactionProxy.read(TestModel.TEST_PATH);
401     }
402
403     @Test
404     public void testExists() throws Exception {
405         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
406
407         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
408                 READ_ONLY);
409
410         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
411                 eq(actorSelection(actorRef)), eqDataExists());
412
413         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
414
415         assertEquals("Exists response", false, exists);
416
417         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
418                 eq(actorSelection(actorRef)), eqDataExists());
419
420         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
421
422         assertEquals("Exists response", true, exists);
423     }
424
425     @Test(expected = PrimaryNotFoundException.class)
426     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
427         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
428             @Override
429             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
430                 return proxy.exists(TestModel.TEST_PATH);
431             }
432         });
433     }
434
435     @Test(expected = ReadFailedException.class)
436     public void testExistsWithInvalidReplyMessageType() throws Exception {
437         setupActorContextWithInitialCreateTransaction(READ_ONLY);
438
439         doReturn(Futures.successful(new Object())).when(mockActorContext).
440                 executeOperationAsync(any(ActorSelection.class), any());
441
442         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
443                 READ_ONLY);
444
445         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
446     }
447
448     @Test(expected = TestException.class)
449     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
450         setupActorContextWithInitialCreateTransaction(READ_ONLY);
451
452         doReturn(Futures.failed(new TestException())).when(mockActorContext).
453                 executeOperationAsync(any(ActorSelection.class), any());
454
455         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
456                 READ_ONLY);
457
458         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
459     }
460
461     @Test(expected = TestException.class)
462     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
463         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
464
465         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
466
467         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
468                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
469
470         doReturn(Futures.failed(new TestException())).when(mockActorContext).
471                 executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
472
473         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
474                 eq(actorSelection(actorRef)), eqDataExists());
475
476         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
477                 READ_WRITE);
478
479         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
480
481         transactionProxy.delete(TestModel.TEST_PATH);
482
483         try {
484             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
485         } finally {
486             verify(mockActorContext, times(0)).executeOperationAsync(
487                     eq(actorSelection(actorRef)), eqDataExists());
488         }
489     }
490
491     @Test
492     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
493         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
494
495         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
496
497         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
498                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
499
500         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
501                 eq(actorSelection(actorRef)), eqDataExists());
502
503         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
504                 READ_WRITE);
505
506         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
507
508         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
509
510         assertEquals("Exists response", true, exists);
511     }
512
513     @Test(expected=IllegalStateException.class)
514     public void testxistsPreConditionCheck() {
515
516         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
517                 WRITE_ONLY);
518
519         transactionProxy.exists(TestModel.TEST_PATH);
520     }
521
522     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
523             Class<?>... expResultTypes) throws Exception {
524         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
525
526         int i = 0;
527         for( Future<Object> future: futures) {
528             assertNotNull("Recording operation Future is null", future);
529
530             Class<?> expResultType = expResultTypes[i++];
531             if(Throwable.class.isAssignableFrom(expResultType)) {
532                 try {
533                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
534                     fail("Expected exception from recording operation Future");
535                 } catch(Exception e) {
536                     // Expected
537                 }
538             } else {
539                 assertEquals("Recording operation Future result type", expResultType,
540                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
541             }
542         }
543     }
544
545     @Test
546     public void testWrite() throws Exception {
547         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
548
549         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
550
551         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
552                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
553
554         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
555                 WRITE_ONLY);
556
557         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
558
559         verify(mockActorContext).executeOperationAsync(
560                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
561
562         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
563                 WriteDataReply.SERIALIZABLE_CLASS);
564     }
565
566     @Test(expected=IllegalStateException.class)
567     public void testWritePreConditionCheck() {
568
569         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
570                 READ_ONLY);
571
572         transactionProxy.write(TestModel.TEST_PATH,
573                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
574     }
575
576     @Test(expected=IllegalStateException.class)
577     public void testWriteAfterReadyPreConditionCheck() {
578
579         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
580                 WRITE_ONLY);
581
582         transactionProxy.ready();
583
584         transactionProxy.write(TestModel.TEST_PATH,
585                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
586     }
587
588     @Test
589     public void testMerge() throws Exception {
590         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
591
592         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
593
594         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
595                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
596
597         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
598                 WRITE_ONLY);
599
600         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
601
602         verify(mockActorContext).executeOperationAsync(
603                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
604
605         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
606                 MergeDataReply.SERIALIZABLE_CLASS);
607     }
608
609     @Test
610     public void testDelete() throws Exception {
611         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
612
613         doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
614                 eq(actorSelection(actorRef)), eqDeleteData());
615
616         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
617                 WRITE_ONLY);
618
619         transactionProxy.delete(TestModel.TEST_PATH);
620
621         verify(mockActorContext).executeOperationAsync(
622                 eq(actorSelection(actorRef)), eqDeleteData());
623
624         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
625                 DeleteDataReply.SERIALIZABLE_CLASS);
626     }
627
628     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
629         Object... expReplies) throws Exception {
630         assertEquals("getReadyOperationFutures size", expReplies.length,
631                 proxy.getCohortFutures().size());
632
633         int i = 0;
634         for( Future<ActorSelection> future: proxy.getCohortFutures()) {
635             assertNotNull("Ready operation Future is null", future);
636
637             Object expReply = expReplies[i++];
638             if(expReply instanceof ActorSelection) {
639                 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
640                 assertEquals("Cohort actor path", (ActorSelection) expReply, actual);
641             } else {
642                 // Expecting exception.
643                 try {
644                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
645                     fail("Expected exception from ready operation Future");
646                 } catch(Exception e) {
647                     // Expected
648                 }
649             }
650         }
651     }
652
653     @SuppressWarnings("unchecked")
654     @Test
655     public void testReady() throws Exception {
656         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
657
658         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
659
660         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
661                 eq(actorSelection(actorRef)), eqReadData());
662
663         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
664                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
665
666         doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
667                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
668
669         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
670                 READ_WRITE);
671
672         transactionProxy.read(TestModel.TEST_PATH);
673
674         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
675
676         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
677
678         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
679
680         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
681
682         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
683                 WriteDataReply.SERIALIZABLE_CLASS);
684
685         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
686     }
687
688     @SuppressWarnings("unchecked")
689     @Test
690     public void testReadyWithRecordingOperationFailure() throws Exception {
691         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
692
693         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
694
695         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
696                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
697
698         doReturn(Futures.failed(new TestException())).when(mockActorContext).
699                 executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
700
701         doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
702                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
703
704         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
705                 WRITE_ONLY);
706
707         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
708
709         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
710
711         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
712
713         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
714
715         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
716
717         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
718                 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
719
720         verifyCohortFutures(proxy, TestException.class);
721     }
722
723     @SuppressWarnings("unchecked")
724     @Test
725     public void testReadyWithReplyFailure() throws Exception {
726         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
727
728         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
729
730         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
731                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
732
733         doReturn(Futures.failed(new TestException())).when(mockActorContext).
734                 executeOperationAsync(eq(actorSelection(actorRef)),
735                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
736
737         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
738                 WRITE_ONLY);
739
740         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
741
742         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
743
744         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
745
746         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
747
748         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
749                 MergeDataReply.SERIALIZABLE_CLASS);
750
751         verifyCohortFutures(proxy, TestException.class);
752     }
753
754     @Test
755     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
756
757         doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
758 //        doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
759 //                anyString(), any());
760
761         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
762                 WRITE_ONLY);
763
764         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
765
766         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
767
768         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
769
770         transactionProxy.delete(TestModel.TEST_PATH);
771
772         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
773
774         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
775
776         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
777
778         verifyCohortFutures(proxy, PrimaryNotFoundException.class);
779     }
780
781     @SuppressWarnings("unchecked")
782     @Test
783     public void testReadyWithInvalidReplyMessageType() throws Exception {
784         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
785
786         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
787
788         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
789                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
790
791         doReturn(Futures.successful(new Object())).when(mockActorContext).
792                 executeOperationAsync(eq(actorSelection(actorRef)),
793                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
794
795         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
796                 WRITE_ONLY);
797
798         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
799
800         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
801
802         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
803
804         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
805
806         verifyCohortFutures(proxy, IllegalArgumentException.class);
807     }
808
809     @Test
810     public void testGetIdentifier() {
811         setupActorContextWithInitialCreateTransaction(READ_ONLY);
812         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
813                 TransactionProxy.TransactionType.READ_ONLY);
814
815         Object id = transactionProxy.getIdentifier();
816         assertNotNull("getIdentifier returned null", id);
817         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
818     }
819
820     @SuppressWarnings("unchecked")
821     @Test
822     public void testClose() throws Exception{
823         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
824
825         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
826                 eq(actorSelection(actorRef)), eqReadData());
827
828         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
829                 READ_WRITE);
830
831         transactionProxy.read(TestModel.TEST_PATH);
832
833         transactionProxy.close();
834
835         verify(mockActorContext).sendOperationAsync(
836                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
837     }
838 }