Merge "BUG 1623 - Clustering : Parsing Error thrown on startup"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.fail;
7 import akka.actor.ActorPath;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.Props;
11 import akka.dispatch.Futures;
12
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15
16 import org.junit.Before;
17 import org.junit.Test;
18 import org.mockito.ArgumentMatcher;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
21
22 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
25
26 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
27 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
29 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
34 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
36 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
42 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
43 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
44 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import java.util.List;
60 import java.util.concurrent.TimeUnit;
61
62 import static org.mockito.Matchers.any;
63 import static org.mockito.Matchers.anyString;
64 import static org.mockito.Mockito.doReturn;
65 import static org.mockito.Mockito.doThrow;
66 import static org.mockito.Mockito.argThat;
67 import static org.mockito.Mockito.eq;
68 import static org.mockito.Mockito.verify;
69 import static org.mockito.Mockito.isA;
70 import static org.mockito.Mockito.times;
71
72 @SuppressWarnings("resource")
73 public class TransactionProxyTest extends AbstractActorTest {
74
75     @SuppressWarnings("serial")
76     static class TestException extends RuntimeException {
77     }
78
79     static interface Invoker {
80         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
81     }
82
83     private final Configuration configuration = new MockConfiguration();
84
85     @Mock
86     private ActorContext mockActorContext;
87
88     private SchemaContext schemaContext;
89
90     String memberName = "mock-member";
91
92     @Before
93     public void setUp(){
94         MockitoAnnotations.initMocks(this);
95
96         schemaContext = TestModel.createTestContext();
97
98         doReturn(getSystem()).when(mockActorContext).getActorSystem();
99         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
100         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
101
102         ShardStrategyFactory.setConfiguration(configuration);
103     }
104
105     private CreateTransaction eqCreateTransaction(final String memberName,
106             final TransactionType type) {
107         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
108             @Override
109             public boolean matches(Object argument) {
110                 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
111                 return obj.getTransactionId().startsWith(memberName) &&
112                        obj.getTransactionType() == type.ordinal();
113             }
114         };
115
116         return argThat(matcher);
117     }
118
119     private DataExists eqDataExists() {
120         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
121             @Override
122             public boolean matches(Object argument) {
123                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
124                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
125             }
126         };
127
128         return argThat(matcher);
129     }
130
131     private ReadData eqReadData() {
132         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
133             @Override
134             public boolean matches(Object argument) {
135                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
136                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
137             }
138         };
139
140         return argThat(matcher);
141     }
142
143     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
144         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
145             @Override
146             public boolean matches(Object argument) {
147                 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
148                     return false;
149                 }
150
151                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
152                 return obj.getPath().equals(TestModel.TEST_PATH) &&
153                        obj.getData().equals(nodeToWrite);
154             }
155         };
156
157         return argThat(matcher);
158     }
159
160     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
161         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
162             @Override
163             public boolean matches(Object argument) {
164                 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
165                     return false;
166                 }
167
168                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
169                 return obj.getPath().equals(TestModel.TEST_PATH) &&
170                        obj.getData().equals(nodeToWrite);
171             }
172         };
173
174         return argThat(matcher);
175     }
176
177     private DeleteData eqDeleteData() {
178         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
179             @Override
180             public boolean matches(Object argument) {
181                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
182                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
183             }
184         };
185
186         return argThat(matcher);
187     }
188
189     private Future<Object> readyTxReply(ActorPath path) {
190         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
191     }
192
193     private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
194         return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
195     }
196
197     private Future<Object> dataExistsReply(boolean exists) {
198         return Futures.successful(new DataExistsReply(exists).toSerializable());
199     }
200
201     private Future<Object> writeDataReply() {
202         return Futures.successful(new WriteDataReply().toSerializable());
203     }
204
205     private Future<Object> mergeDataReply() {
206         return Futures.successful(new MergeDataReply().toSerializable());
207     }
208
209     private Future<Object> deleteDataReply() {
210         return Futures.successful(new DeleteDataReply().toSerializable());
211     }
212
213     private ActorSelection actorSelection(ActorRef actorRef) {
214         return getSystem().actorSelection(actorRef.path());
215     }
216
217     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
218         return CreateTransactionReply.newBuilder()
219             .setTransactionActorPath(actorRef.path().toString())
220             .setTransactionId("txn-1").build();
221     }
222
223     private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
224         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
225         doReturn(getSystem().actorSelection(actorRef.path())).
226                 when(mockActorContext).actorSelection(actorRef.path().toString());
227         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
228                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
229                         eqCreateTransaction(memberName, type));
230         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
231                 anyString(), eq(actorRef.path().toString()));
232         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
233
234         return actorRef;
235     }
236
237     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
238             throws Throwable {
239
240         try {
241             future.checkedGet(5, TimeUnit.SECONDS);
242             fail("Expected ReadFailedException");
243         } catch(ReadFailedException e) {
244             throw e.getCause();
245         }
246     }
247
248     @Test
249     public void testRead() throws Exception {
250         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
251
252         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
253                 READ_ONLY);
254
255         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
256                 eq(actorSelection(actorRef)), eqReadData());
257
258         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
259                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
260
261         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
262
263         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
264
265         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
266                 eq(actorSelection(actorRef)), eqReadData());
267
268         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
269
270         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
271
272         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
273     }
274
275     @Test(expected = ReadFailedException.class)
276     public void testReadWithInvalidReplyMessageType() throws Exception {
277         setupActorContextWithInitialCreateTransaction(READ_ONLY);
278
279         doReturn(Futures.successful(new Object())).when(mockActorContext).
280                 executeRemoteOperationAsync(any(ActorSelection.class), any());
281
282         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
283                 READ_ONLY);
284
285         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
286     }
287
288     @Test(expected = TestException.class)
289     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
290         setupActorContextWithInitialCreateTransaction(READ_ONLY);
291
292         doReturn(Futures.failed(new TestException())).when(mockActorContext).
293                 executeRemoteOperationAsync(any(ActorSelection.class), any());
294
295         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
296                 READ_ONLY);
297
298         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
299     }
300
301     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
302             throws Throwable {
303
304         doThrow(exToThrow).when(mockActorContext).executeShardOperation(
305                 anyString(), any());
306
307         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
308                 READ_ONLY);
309
310         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
311     }
312
313     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
314         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
315             @Override
316             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
317                 return proxy.read(TestModel.TEST_PATH);
318             }
319         });
320     }
321
322     @Test(expected = PrimaryNotFoundException.class)
323     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
324         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
325     }
326
327     @Test(expected = TimeoutException.class)
328     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
329         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
330                 new Exception("reason")));
331     }
332
333     @Test(expected = TestException.class)
334     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
335         testReadWithExceptionOnInitialCreateTransaction(new TestException());
336     }
337
338     @Test(expected = TestException.class)
339     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
340         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
341
342         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
343
344         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
345                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
346
347         doReturn(Futures.failed(new TestException())).when(mockActorContext).
348                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
349
350         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
351                 eq(actorSelection(actorRef)), eqReadData());
352
353         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
354                 READ_WRITE);
355
356         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
357
358         transactionProxy.delete(TestModel.TEST_PATH);
359
360         try {
361             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
362         } finally {
363             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
364                     eq(actorSelection(actorRef)), eqReadData());
365         }
366     }
367
368     @Test
369     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
370         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
371
372         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
373
374         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
375                 eq(actorSelection(actorRef)), eqWriteData(expectedNode));
376
377         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
378                 eq(actorSelection(actorRef)), eqReadData());
379
380         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
381                 READ_WRITE);
382
383         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
384
385         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
386                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
387
388         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
389
390         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
391     }
392
393     @Test(expected=IllegalStateException.class)
394     public void testReadPreConditionCheck() {
395
396         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
397                 WRITE_ONLY);
398
399         transactionProxy.read(TestModel.TEST_PATH);
400     }
401
402     @Test
403     public void testExists() throws Exception {
404         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
405
406         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
407                 READ_ONLY);
408
409         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
410                 eq(actorSelection(actorRef)), eqDataExists());
411
412         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
413
414         assertEquals("Exists response", false, exists);
415
416         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
417                 eq(actorSelection(actorRef)), eqDataExists());
418
419         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
420
421         assertEquals("Exists response", true, exists);
422     }
423
424     @Test(expected = PrimaryNotFoundException.class)
425     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
426         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
427             @Override
428             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
429                 return proxy.exists(TestModel.TEST_PATH);
430             }
431         });
432     }
433
434     @Test(expected = ReadFailedException.class)
435     public void testExistsWithInvalidReplyMessageType() throws Exception {
436         setupActorContextWithInitialCreateTransaction(READ_ONLY);
437
438         doReturn(Futures.successful(new Object())).when(mockActorContext).
439                 executeRemoteOperationAsync(any(ActorSelection.class), any());
440
441         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
442                 READ_ONLY);
443
444         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
445     }
446
447     @Test(expected = TestException.class)
448     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
449         setupActorContextWithInitialCreateTransaction(READ_ONLY);
450
451         doReturn(Futures.failed(new TestException())).when(mockActorContext).
452                 executeRemoteOperationAsync(any(ActorSelection.class), any());
453
454         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
455                 READ_ONLY);
456
457         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
458     }
459
460     @Test(expected = TestException.class)
461     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
462         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
463
464         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
465
466         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
467                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
468
469         doReturn(Futures.failed(new TestException())).when(mockActorContext).
470                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
471
472         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
473                 eq(actorSelection(actorRef)), eqDataExists());
474
475         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
476                 READ_WRITE);
477
478         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
479
480         transactionProxy.delete(TestModel.TEST_PATH);
481
482         try {
483             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
484         } finally {
485             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
486                     eq(actorSelection(actorRef)), eqDataExists());
487         }
488     }
489
490     @Test
491     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
492         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
493
494         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
495
496         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
497                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
498
499         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
500                 eq(actorSelection(actorRef)), eqDataExists());
501
502         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
503                 READ_WRITE);
504
505         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
506
507         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
508
509         assertEquals("Exists response", true, exists);
510     }
511
512     @Test(expected=IllegalStateException.class)
513     public void testxistsPreConditionCheck() {
514
515         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
516                 WRITE_ONLY);
517
518         transactionProxy.exists(TestModel.TEST_PATH);
519     }
520
521     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
522             Class<?>... expResultTypes) throws Exception {
523         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
524
525         int i = 0;
526         for( Future<Object> future: futures) {
527             assertNotNull("Recording operation Future is null", future);
528
529             Class<?> expResultType = expResultTypes[i++];
530             if(Throwable.class.isAssignableFrom(expResultType)) {
531                 try {
532                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
533                     fail("Expected exception from recording operation Future");
534                 } catch(Exception e) {
535                     // Expected
536                 }
537             } else {
538                 assertEquals("Recording operation Future result type", expResultType,
539                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
540             }
541         }
542     }
543
544     @Test
545     public void testWrite() throws Exception {
546         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
547
548         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
549
550         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
551                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
552
553         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
554                 WRITE_ONLY);
555
556         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
557
558         verify(mockActorContext).executeRemoteOperationAsync(
559                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
560
561         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
562                 WriteDataReply.SERIALIZABLE_CLASS);
563     }
564
565     @Test(expected=IllegalStateException.class)
566     public void testWritePreConditionCheck() {
567
568         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
569                 READ_ONLY);
570
571         transactionProxy.write(TestModel.TEST_PATH,
572                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
573     }
574
575     @Test(expected=IllegalStateException.class)
576     public void testWriteAfterReadyPreConditionCheck() {
577
578         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
579                 WRITE_ONLY);
580
581         transactionProxy.ready();
582
583         transactionProxy.write(TestModel.TEST_PATH,
584                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
585     }
586
587     @Test
588     public void testMerge() throws Exception {
589         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
590
591         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
592
593         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
594                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
595
596         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
597                 WRITE_ONLY);
598
599         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
600
601         verify(mockActorContext).executeRemoteOperationAsync(
602                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
603
604         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
605                 MergeDataReply.SERIALIZABLE_CLASS);
606     }
607
608     @Test
609     public void testDelete() throws Exception {
610         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
611
612         doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
613                 eq(actorSelection(actorRef)), eqDeleteData());
614
615         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
616                 WRITE_ONLY);
617
618         transactionProxy.delete(TestModel.TEST_PATH);
619
620         verify(mockActorContext).executeRemoteOperationAsync(
621                 eq(actorSelection(actorRef)), eqDeleteData());
622
623         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
624                 DeleteDataReply.SERIALIZABLE_CLASS);
625     }
626
627     private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
628             Object... expReplies) throws Exception {
629         assertEquals("getReadyOperationFutures size", expReplies.length,
630                 proxy.getCohortPathFutures().size());
631
632         int i = 0;
633         for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
634             assertNotNull("Ready operation Future is null", future);
635
636             Object expReply = expReplies[i++];
637             if(expReply instanceof ActorPath) {
638                 ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
639                 assertEquals("Cohort actor path", expReply, actual);
640             } else {
641                 // Expecting exception.
642                 try {
643                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
644                     fail("Expected exception from ready operation Future");
645                 } catch(Exception e) {
646                     // Expected
647                 }
648             }
649         }
650     }
651
652     @SuppressWarnings("unchecked")
653     @Test
654     public void testReady() throws Exception {
655         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
656
657         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
658
659         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
660                 eq(actorSelection(actorRef)), eqReadData());
661
662         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
663                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
664
665         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
666                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
667
668         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
669                 READ_WRITE);
670
671         transactionProxy.read(TestModel.TEST_PATH);
672
673         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
674
675         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
676
677         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
678
679         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
680
681         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
682                 WriteDataReply.SERIALIZABLE_CLASS);
683
684         verifyCohortPathFutures(proxy, actorRef.path());
685     }
686
687     @SuppressWarnings("unchecked")
688     @Test
689     public void testReadyWithRecordingOperationFailure() throws Exception {
690         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
691
692         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
693
694         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
695                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
696
697         doReturn(Futures.failed(new TestException())).when(mockActorContext).
698                 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
699
700         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
701                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
702
703         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
704                 WRITE_ONLY);
705
706         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
707
708         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
709
710         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
711
712         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
713
714         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
715
716         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
717                 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
718
719         verifyCohortPathFutures(proxy, TestException.class);
720     }
721
722     @SuppressWarnings("unchecked")
723     @Test
724     public void testReadyWithReplyFailure() throws Exception {
725         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
726
727         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
728
729         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
730                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
731
732         doReturn(Futures.failed(new TestException())).when(mockActorContext).
733                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
734                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
735
736         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
737                 WRITE_ONLY);
738
739         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
740
741         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
742
743         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
744
745         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
746
747         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
748                 MergeDataReply.SERIALIZABLE_CLASS);
749
750         verifyCohortPathFutures(proxy, TestException.class);
751     }
752
753     @Test
754     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
755
756         doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
757                 anyString(), any());
758
759         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
760                 WRITE_ONLY);
761
762         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
763
764         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
765
766         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
767
768         transactionProxy.delete(TestModel.TEST_PATH);
769
770         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
771
772         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
773
774         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
775
776         verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
777     }
778
779     @SuppressWarnings("unchecked")
780     @Test
781     public void testReadyWithInvalidReplyMessageType() throws Exception {
782         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
783
784         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
785
786         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
787                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
788
789         doReturn(Futures.successful(new Object())).when(mockActorContext).
790                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
791                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
792
793         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
794                 WRITE_ONLY);
795
796         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
797
798         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
799
800         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
801
802         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
803
804         verifyCohortPathFutures(proxy, IllegalArgumentException.class);
805     }
806
807     @Test
808     public void testGetIdentifier() {
809         setupActorContextWithInitialCreateTransaction(READ_ONLY);
810         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
811                 TransactionProxy.TransactionType.READ_ONLY);
812
813         Object id = transactionProxy.getIdentifier();
814         assertNotNull("getIdentifier returned null", id);
815         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
816     }
817
818     @SuppressWarnings("unchecked")
819     @Test
820     public void testClose() throws Exception{
821         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
822
823         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
824                 eq(actorSelection(actorRef)), eqReadData());
825
826         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
827                 READ_WRITE);
828
829         transactionProxy.read(TestModel.TEST_PATH);
830
831         transactionProxy.close();
832
833         verify(mockActorContext).sendRemoteOperationAsync(
834                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
835     }
836 }