BUG 1623 - Clustering : Parsing Error thrown on startup
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.fail;
7 import akka.actor.ActorPath;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.Props;
11 import akka.dispatch.Futures;
12
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15
16 import org.junit.Before;
17 import org.junit.Test;
18 import org.mockito.ArgumentMatcher;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
21
22 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
25
26 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
27 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
29 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
34 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
36 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
42 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
43 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
44 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
60
61 import java.util.List;
62 import java.util.concurrent.TimeUnit;
63
64 import static org.mockito.Matchers.any;
65 import static org.mockito.Matchers.anyString;
66 import static org.mockito.Mockito.doReturn;
67 import static org.mockito.Mockito.doThrow;
68 import static org.mockito.Mockito.argThat;
69 import static org.mockito.Mockito.eq;
70 import static org.mockito.Mockito.verify;
71 import static org.mockito.Mockito.isA;
72 import static org.mockito.Mockito.times;
73
74 @SuppressWarnings("resource")
75 public class TransactionProxyTest extends AbstractActorTest {
76
77     @SuppressWarnings("serial")
78     static class TestException extends RuntimeException {
79     }
80
81     static interface Invoker {
82         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
83     }
84
85     private final Configuration configuration = new MockConfiguration();
86
87     @Mock
88     private ActorContext mockActorContext;
89
90     private SchemaContext schemaContext;
91
92     String memberName = "mock-member";
93
94     @Before
95     public void setUp(){
96         MockitoAnnotations.initMocks(this);
97
98         schemaContext = TestModel.createTestContext();
99
100         doReturn(getSystem()).when(mockActorContext).getActorSystem();
101         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
102         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
103
104         ShardStrategyFactory.setConfiguration(configuration);
105     }
106
107     private CreateTransaction eqCreateTransaction(final String memberName,
108             final TransactionType type) {
109         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
110             @Override
111             public boolean matches(Object argument) {
112                 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
113                 return obj.getTransactionId().startsWith(memberName) &&
114                        obj.getTransactionType() == type.ordinal();
115             }
116         };
117
118         return argThat(matcher);
119     }
120
121     private DataExists eqDataExists() {
122         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
123             @Override
124             public boolean matches(Object argument) {
125                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
126                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
127             }
128         };
129
130         return argThat(matcher);
131     }
132
133     private ReadData eqReadData() {
134         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
135             @Override
136             public boolean matches(Object argument) {
137                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
138                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
139             }
140         };
141
142         return argThat(matcher);
143     }
144
145     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
146         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
147             @Override
148             public boolean matches(Object argument) {
149                 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
150                     return false;
151                 }
152
153                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
154                 return obj.getPath().equals(TestModel.TEST_PATH) &&
155                        obj.getData().equals(nodeToWrite);
156             }
157         };
158
159         return argThat(matcher);
160     }
161
162     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
163         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
164             @Override
165             public boolean matches(Object argument) {
166                 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
167                     return false;
168                 }
169
170                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
171                 return obj.getPath().equals(TestModel.TEST_PATH) &&
172                        obj.getData().equals(nodeToWrite);
173             }
174         };
175
176         return argThat(matcher);
177     }
178
179     private DeleteData eqDeleteData() {
180         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
181             @Override
182             public boolean matches(Object argument) {
183                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
184                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
185             }
186         };
187
188         return argThat(matcher);
189     }
190
191     private Future<Object> readyTxReply(ActorPath path) {
192         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
193     }
194
195     private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
196         return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
197     }
198
199     private Future<Object> dataExistsReply(boolean exists) {
200         return Futures.successful(new DataExistsReply(exists).toSerializable());
201     }
202
203     private Future<Object> writeDataReply() {
204         return Futures.successful(new WriteDataReply().toSerializable());
205     }
206
207     private Future<Object> mergeDataReply() {
208         return Futures.successful(new MergeDataReply().toSerializable());
209     }
210
211     private Future<Object> deleteDataReply() {
212         return Futures.successful(new DeleteDataReply().toSerializable());
213     }
214
215     private ActorSelection actorSelection(ActorRef actorRef) {
216         return getSystem().actorSelection(actorRef.path());
217     }
218
219     private FiniteDuration anyDuration() {
220         return any(FiniteDuration.class);
221     }
222
223     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
224         return CreateTransactionReply.newBuilder()
225             .setTransactionActorPath(actorRef.path().toString())
226             .setTransactionId("txn-1").build();
227     }
228
229     private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
230         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
231         doReturn(getSystem().actorSelection(actorRef.path())).
232                 when(mockActorContext).actorSelection(actorRef.path().toString());
233         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
234                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
235                         eqCreateTransaction(memberName, type), anyDuration());
236         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
237                 anyString(), eq(actorRef.path().toString()));
238         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
239
240         return actorRef;
241     }
242
243     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
244             throws Throwable {
245
246         try {
247             future.checkedGet(5, TimeUnit.SECONDS);
248             fail("Expected ReadFailedException");
249         } catch(ReadFailedException e) {
250             throw e.getCause();
251         }
252     }
253
254     @Test
255     public void testRead() throws Exception {
256         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
257
258         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
259                 READ_ONLY);
260
261         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
262                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
263
264         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
265                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
266
267         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
268
269         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
270
271         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
272                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
273
274         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
275
276         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
277
278         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
279     }
280
281     @Test(expected = ReadFailedException.class)
282     public void testReadWithInvalidReplyMessageType() throws Exception {
283         setupActorContextWithInitialCreateTransaction(READ_ONLY);
284
285         doReturn(Futures.successful(new Object())).when(mockActorContext).
286                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
287
288         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
289                 READ_ONLY);
290
291         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
292     }
293
294     @Test(expected = TestException.class)
295     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
296         setupActorContextWithInitialCreateTransaction(READ_ONLY);
297
298         doReturn(Futures.failed(new TestException())).when(mockActorContext).
299                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
300
301         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
302                 READ_ONLY);
303
304         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
305     }
306
307     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
308             throws Throwable {
309
310         doThrow(exToThrow).when(mockActorContext).executeShardOperation(
311                 anyString(), any(), anyDuration());
312
313         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
314                 READ_ONLY);
315
316         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
317     }
318
319     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
320         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
321             @Override
322             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
323                 return proxy.read(TestModel.TEST_PATH);
324             }
325         });
326     }
327
328     @Test(expected = PrimaryNotFoundException.class)
329     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
330         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
331     }
332
333     @Test(expected = TimeoutException.class)
334     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
335         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
336                 new Exception("reason")));
337     }
338
339     @Test(expected = TestException.class)
340     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
341         testReadWithExceptionOnInitialCreateTransaction(new TestException());
342     }
343
344     @Test(expected = TestException.class)
345     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
346         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
347
348         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
349
350         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
351                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
352
353         doReturn(Futures.failed(new TestException())).when(mockActorContext).
354                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
355                         anyDuration());
356
357         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
358                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
359
360         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
361                 READ_WRITE);
362
363         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
364
365         transactionProxy.delete(TestModel.TEST_PATH);
366
367         try {
368             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
369         } finally {
370             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
371                     eq(actorSelection(actorRef)), eqReadData(), anyDuration());
372         }
373     }
374
375     @Test
376     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
377         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
378
379         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
380
381         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
382                 eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
383
384         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
385                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
386
387         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
388                 READ_WRITE);
389
390         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
391
392         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
393                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
394
395         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
396
397         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
398     }
399
400     @Test(expected=IllegalStateException.class)
401     public void testReadPreConditionCheck() {
402
403         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
404                 WRITE_ONLY);
405
406         transactionProxy.read(TestModel.TEST_PATH);
407     }
408
409     @Test
410     public void testExists() throws Exception {
411         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
412
413         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
414                 READ_ONLY);
415
416         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
417                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
418
419         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
420
421         assertEquals("Exists response", false, exists);
422
423         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
424                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
425
426         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
427
428         assertEquals("Exists response", true, exists);
429     }
430
431     @Test(expected = PrimaryNotFoundException.class)
432     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
433         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
434             @Override
435             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
436                 return proxy.exists(TestModel.TEST_PATH);
437             }
438         });
439     }
440
441     @Test(expected = ReadFailedException.class)
442     public void testExistsWithInvalidReplyMessageType() throws Exception {
443         setupActorContextWithInitialCreateTransaction(READ_ONLY);
444
445         doReturn(Futures.successful(new Object())).when(mockActorContext).
446                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
447
448         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
449                 READ_ONLY);
450
451         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
452     }
453
454     @Test(expected = TestException.class)
455     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
456         setupActorContextWithInitialCreateTransaction(READ_ONLY);
457
458         doReturn(Futures.failed(new TestException())).when(mockActorContext).
459                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
460
461         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
462                 READ_ONLY);
463
464         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
465     }
466
467     @Test(expected = TestException.class)
468     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
469         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
470
471         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472
473         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
474                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
475
476         doReturn(Futures.failed(new TestException())).when(mockActorContext).
477                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
478                         anyDuration());
479
480         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
481                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
482
483         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
484                 READ_WRITE);
485
486         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
487
488         transactionProxy.delete(TestModel.TEST_PATH);
489
490         try {
491             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
492         } finally {
493             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
494                     eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
495         }
496     }
497
498     @Test
499     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
500         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
501
502         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
503
504         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
505                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
506
507         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
508                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
509
510         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
511                 READ_WRITE);
512
513         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
514
515         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
516
517         assertEquals("Exists response", true, exists);
518     }
519
520     @Test(expected=IllegalStateException.class)
521     public void testxistsPreConditionCheck() {
522
523         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
524                 WRITE_ONLY);
525
526         transactionProxy.exists(TestModel.TEST_PATH);
527     }
528
529     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
530             Class<?>... expResultTypes) throws Exception {
531         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
532
533         int i = 0;
534         for( Future<Object> future: futures) {
535             assertNotNull("Recording operation Future is null", future);
536
537             Class<?> expResultType = expResultTypes[i++];
538             if(Throwable.class.isAssignableFrom(expResultType)) {
539                 try {
540                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
541                     fail("Expected exception from recording operation Future");
542                 } catch(Exception e) {
543                     // Expected
544                 }
545             } else {
546                 assertEquals("Recording operation Future result type", expResultType,
547                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
548             }
549         }
550     }
551
552     @Test
553     public void testWrite() throws Exception {
554         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
555
556         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
557
558         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
559                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
560
561         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
562                 WRITE_ONLY);
563
564         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
565
566         verify(mockActorContext).executeRemoteOperationAsync(
567                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
568
569         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
570                 WriteDataReply.SERIALIZABLE_CLASS);
571     }
572
573     @Test(expected=IllegalStateException.class)
574     public void testWritePreConditionCheck() {
575
576         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
577                 READ_ONLY);
578
579         transactionProxy.write(TestModel.TEST_PATH,
580                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
581     }
582
583     @Test(expected=IllegalStateException.class)
584     public void testWriteAfterReadyPreConditionCheck() {
585
586         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
587                 WRITE_ONLY);
588
589         transactionProxy.ready();
590
591         transactionProxy.write(TestModel.TEST_PATH,
592                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
593     }
594
595     @Test
596     public void testMerge() throws Exception {
597         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
598
599         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
600
601         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
602                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
603
604         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
605                 WRITE_ONLY);
606
607         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
608
609         verify(mockActorContext).executeRemoteOperationAsync(
610                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
611
612         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
613                 MergeDataReply.SERIALIZABLE_CLASS);
614     }
615
616     @Test
617     public void testDelete() throws Exception {
618         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
619
620         doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
621                 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
622
623         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
624                 WRITE_ONLY);
625
626         transactionProxy.delete(TestModel.TEST_PATH);
627
628         verify(mockActorContext).executeRemoteOperationAsync(
629                 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
630
631         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
632                 DeleteDataReply.SERIALIZABLE_CLASS);
633     }
634
635     private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
636             Object... expReplies) throws Exception {
637         assertEquals("getReadyOperationFutures size", expReplies.length,
638                 proxy.getCohortPathFutures().size());
639
640         int i = 0;
641         for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
642             assertNotNull("Ready operation Future is null", future);
643
644             Object expReply = expReplies[i++];
645             if(expReply instanceof ActorPath) {
646                 ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
647                 assertEquals("Cohort actor path", expReply, actual);
648             } else {
649                 // Expecting exception.
650                 try {
651                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
652                     fail("Expected exception from ready operation Future");
653                 } catch(Exception e) {
654                     // Expected
655                 }
656             }
657         }
658     }
659
660     @SuppressWarnings("unchecked")
661     @Test
662     public void testReady() throws Exception {
663         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
664
665         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
666
667         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
668                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
669
670         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
671                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
672
673         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
674                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
675
676         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
677                 READ_WRITE);
678
679         transactionProxy.read(TestModel.TEST_PATH);
680
681         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
682
683         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
684
685         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
686
687         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
688
689         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
690                 WriteDataReply.SERIALIZABLE_CLASS);
691
692         verifyCohortPathFutures(proxy, actorRef.path());
693     }
694
695     @SuppressWarnings("unchecked")
696     @Test
697     public void testReadyWithRecordingOperationFailure() throws Exception {
698         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
699
700         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
701
702         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
703                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
704
705         doReturn(Futures.failed(new TestException())).when(mockActorContext).
706                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
707                         anyDuration());
708
709         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
710                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
711
712         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
713                 WRITE_ONLY);
714
715         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
716
717         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
718
719         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
720
721         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
722
723         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
724
725         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
726                 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
727
728         verifyCohortPathFutures(proxy, TestException.class);
729     }
730
731     @SuppressWarnings("unchecked")
732     @Test
733     public void testReadyWithReplyFailure() throws Exception {
734         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
735
736         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
737
738         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
739                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
740
741         doReturn(Futures.failed(new TestException())).when(mockActorContext).
742                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
743                         isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
744
745         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
746                 WRITE_ONLY);
747
748         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
749
750         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
751
752         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
753
754         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
755
756         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
757                 MergeDataReply.SERIALIZABLE_CLASS);
758
759         verifyCohortPathFutures(proxy, TestException.class);
760     }
761
762     @Test
763     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
764
765         doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
766                 anyString(), any(), anyDuration());
767
768         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
769                 WRITE_ONLY);
770
771         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
772
773         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
774
775         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
776
777         transactionProxy.delete(TestModel.TEST_PATH);
778
779         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
780
781         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
782
783         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
784
785         verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
786     }
787
788     @SuppressWarnings("unchecked")
789     @Test
790     public void testReadyWithInvalidReplyMessageType() throws Exception {
791         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
792
793         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
794
795         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
796                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
797
798         doReturn(Futures.successful(new Object())).when(mockActorContext).
799                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
800                         isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
801
802         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
803                 WRITE_ONLY);
804
805         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
806
807         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
808
809         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
810
811         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
812
813         verifyCohortPathFutures(proxy, IllegalArgumentException.class);
814     }
815
816     @Test
817     public void testGetIdentifier() {
818         setupActorContextWithInitialCreateTransaction(READ_ONLY);
819         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
820                 TransactionProxy.TransactionType.READ_ONLY);
821
822         Object id = transactionProxy.getIdentifier();
823         assertNotNull("getIdentifier returned null", id);
824         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
825     }
826
827     @SuppressWarnings("unchecked")
828     @Test
829     public void testClose() throws Exception{
830         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
831
832         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
833                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
834
835         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
836                 READ_WRITE);
837
838         transactionProxy.read(TestModel.TEST_PATH);
839
840         transactionProxy.close();
841
842         verify(mockActorContext).sendRemoteOperationAsync(
843                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
844     }
845 }