Merge "BUG-1521 netconf-impl & netconf-monitoring line coverage."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorPath;
4 import akka.actor.ActorRef;
5 import akka.actor.ActorSelection;
6 import akka.actor.Props;
7 import akka.dispatch.Futures;
8 import com.google.common.base.Optional;
9 import com.google.common.util.concurrent.CheckedFuture;
10 import org.junit.Before;
11 import org.junit.Test;
12 import org.mockito.ArgumentMatcher;
13 import org.mockito.Mock;
14 import org.mockito.MockitoAnnotations;
15 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
16 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
21 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
22 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
23 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
25 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
31 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
32 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
36 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
37 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import scala.concurrent.Await;
45 import scala.concurrent.Future;
46 import scala.concurrent.duration.Duration;
47
48 import java.util.List;
49 import java.util.concurrent.TimeUnit;
50
51 import static org.junit.Assert.assertEquals;
52 import static org.junit.Assert.assertNotNull;
53 import static org.junit.Assert.assertTrue;
54 import static org.junit.Assert.fail;
55 import static org.mockito.Matchers.any;
56 import static org.mockito.Matchers.anyString;
57 import static org.mockito.Mockito.argThat;
58 import static org.mockito.Mockito.doReturn;
59 import static org.mockito.Mockito.doThrow;
60 import static org.mockito.Mockito.eq;
61 import static org.mockito.Mockito.isA;
62 import static org.mockito.Mockito.times;
63 import static org.mockito.Mockito.verify;
64 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
65 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
66 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
67
68 @SuppressWarnings("resource")
69 public class TransactionProxyTest extends AbstractActorTest {
70
71     @SuppressWarnings("serial")
72     static class TestException extends RuntimeException {
73     }
74
75     static interface Invoker {
76         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
77     }
78
79     private final Configuration configuration = new MockConfiguration();
80
81     @Mock
82     private ActorContext mockActorContext;
83
84     private SchemaContext schemaContext;
85
86     String memberName = "mock-member";
87
88     @Before
89     public void setUp(){
90         MockitoAnnotations.initMocks(this);
91
92         schemaContext = TestModel.createTestContext();
93
94         doReturn(getSystem()).when(mockActorContext).getActorSystem();
95         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
96         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
97
98         ShardStrategyFactory.setConfiguration(configuration);
99     }
100
101     private CreateTransaction eqCreateTransaction(final String memberName,
102             final TransactionType type) {
103         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
104             @Override
105             public boolean matches(Object argument) {
106                 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
107                 return obj.getTransactionId().startsWith(memberName) &&
108                        obj.getTransactionType() == type.ordinal();
109             }
110         };
111
112         return argThat(matcher);
113     }
114
115     private DataExists eqDataExists() {
116         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
117             @Override
118             public boolean matches(Object argument) {
119                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
120                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
121             }
122         };
123
124         return argThat(matcher);
125     }
126
127     private ReadData eqReadData() {
128         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
129             @Override
130             public boolean matches(Object argument) {
131                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
132                        ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
133             }
134         };
135
136         return argThat(matcher);
137     }
138
139     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
140         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
141             @Override
142             public boolean matches(Object argument) {
143                 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
144                     return false;
145                 }
146
147                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
148                 return obj.getPath().equals(TestModel.TEST_PATH) &&
149                        obj.getData().equals(nodeToWrite);
150             }
151         };
152
153         return argThat(matcher);
154     }
155
156     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
157         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
158             @Override
159             public boolean matches(Object argument) {
160                 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
161                     return false;
162                 }
163
164                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
165                 return obj.getPath().equals(TestModel.TEST_PATH) &&
166                        obj.getData().equals(nodeToWrite);
167             }
168         };
169
170         return argThat(matcher);
171     }
172
173     private DeleteData eqDeleteData() {
174         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
175             @Override
176             public boolean matches(Object argument) {
177                 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
178                        DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
179             }
180         };
181
182         return argThat(matcher);
183     }
184
185     private Future<Object> readyTxReply(ActorPath path) {
186         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
187     }
188
189     private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
190         return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
191     }
192
193     private Future<Object> dataExistsReply(boolean exists) {
194         return Futures.successful(new DataExistsReply(exists).toSerializable());
195     }
196
197     private Future<Object> writeDataReply() {
198         return Futures.successful(new WriteDataReply().toSerializable());
199     }
200
201     private Future<Object> mergeDataReply() {
202         return Futures.successful(new MergeDataReply().toSerializable());
203     }
204
205     private Future<Object> deleteDataReply() {
206         return Futures.successful(new DeleteDataReply().toSerializable());
207     }
208
209     private ActorSelection actorSelection(ActorRef actorRef) {
210         return getSystem().actorSelection(actorRef.path());
211     }
212
213     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
214         return CreateTransactionReply.newBuilder()
215             .setTransactionActorPath(actorRef.path().toString())
216             .setTransactionId("txn-1").build();
217     }
218
219     private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
220         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
221         doReturn(getSystem().actorSelection(actorRef.path())).
222                 when(mockActorContext).actorSelection(actorRef.path().toString());
223
224         doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
225                 when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
226
227         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
228                 executeOperation(eq(getSystem().actorSelection(actorRef.path())),
229                         eqCreateTransaction(memberName, type));
230
231         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
232                 anyString(), eq(actorRef.path().toString()));
233
234         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
235
236         return actorRef;
237     }
238
239     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
240             throws Throwable {
241
242         try {
243             future.checkedGet(5, TimeUnit.SECONDS);
244             fail("Expected ReadFailedException");
245         } catch(ReadFailedException e) {
246             throw e.getCause();
247         }
248     }
249
250     @Test
251     public void testRead() throws Exception {
252         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
253
254         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
255                 READ_ONLY);
256
257         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
258                 eq(actorSelection(actorRef)), eqReadData());
259
260         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
261                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
262
263         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
264
265         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
266
267         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
268                 eq(actorSelection(actorRef)), eqReadData());
269
270         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
271
272         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
273
274         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
275     }
276
277     @Test(expected = ReadFailedException.class)
278     public void testReadWithInvalidReplyMessageType() throws Exception {
279         setupActorContextWithInitialCreateTransaction(READ_ONLY);
280
281         doReturn(Futures.successful(new Object())).when(mockActorContext).
282                 executeOperationAsync(any(ActorSelection.class), any());
283
284         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
285                 READ_ONLY);
286
287         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
288     }
289
290     @Test(expected = TestException.class)
291     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
292         setupActorContextWithInitialCreateTransaction(READ_ONLY);
293
294         doReturn(Futures.failed(new TestException())).when(mockActorContext).
295                 executeOperationAsync(any(ActorSelection.class), any());
296
297         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
298                 READ_ONLY);
299
300         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
301     }
302
303     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
304             throws Throwable {
305         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
306
307         if (exToThrow instanceof PrimaryNotFoundException) {
308             doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
309         } else {
310             doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
311                     when(mockActorContext).findPrimaryShard(anyString());
312         }
313         doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
314
315         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
316
317         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
318     }
319
320     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
321         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
322             @Override
323             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
324                 return proxy.read(TestModel.TEST_PATH);
325             }
326         });
327     }
328
329     @Test(expected = PrimaryNotFoundException.class)
330     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
331         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
332     }
333
334     @Test(expected = TimeoutException.class)
335     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
336         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
337                 new Exception("reason")));
338     }
339
340     @Test(expected = TestException.class)
341     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
342         testReadWithExceptionOnInitialCreateTransaction(new TestException());
343     }
344
345     @Test(expected = TestException.class)
346     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
347         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
348
349         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
350
351         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
352                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
353
354         doReturn(Futures.failed(new TestException())).when(mockActorContext).
355                 executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
356
357         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
358                 eq(actorSelection(actorRef)), eqReadData());
359
360         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
361                 READ_WRITE);
362
363         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
364
365         transactionProxy.delete(TestModel.TEST_PATH);
366
367         try {
368             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
369         } finally {
370             verify(mockActorContext, times(0)).executeOperationAsync(
371                     eq(actorSelection(actorRef)), eqReadData());
372         }
373     }
374
375     @Test
376     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
377         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
378
379         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
380
381         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
382                 eq(actorSelection(actorRef)), eqWriteData(expectedNode));
383
384         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
385                 eq(actorSelection(actorRef)), eqReadData());
386
387         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
388                 READ_WRITE);
389
390         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
391
392         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
393                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
394
395         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
396
397         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
398     }
399
400     @Test(expected=IllegalStateException.class)
401     public void testReadPreConditionCheck() {
402
403         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
404                 WRITE_ONLY);
405
406         transactionProxy.read(TestModel.TEST_PATH);
407     }
408
409     @Test
410     public void testExists() throws Exception {
411         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
412
413         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
414                 READ_ONLY);
415
416         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
417                 eq(actorSelection(actorRef)), eqDataExists());
418
419         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
420
421         assertEquals("Exists response", false, exists);
422
423         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
424                 eq(actorSelection(actorRef)), eqDataExists());
425
426         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
427
428         assertEquals("Exists response", true, exists);
429     }
430
431     @Test(expected = PrimaryNotFoundException.class)
432     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
433         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
434             @Override
435             public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
436                 return proxy.exists(TestModel.TEST_PATH);
437             }
438         });
439     }
440
441     @Test(expected = ReadFailedException.class)
442     public void testExistsWithInvalidReplyMessageType() throws Exception {
443         setupActorContextWithInitialCreateTransaction(READ_ONLY);
444
445         doReturn(Futures.successful(new Object())).when(mockActorContext).
446                 executeOperationAsync(any(ActorSelection.class), any());
447
448         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
449                 READ_ONLY);
450
451         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
452     }
453
454     @Test(expected = TestException.class)
455     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
456         setupActorContextWithInitialCreateTransaction(READ_ONLY);
457
458         doReturn(Futures.failed(new TestException())).when(mockActorContext).
459                 executeOperationAsync(any(ActorSelection.class), any());
460
461         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
462                 READ_ONLY);
463
464         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
465     }
466
467     @Test(expected = TestException.class)
468     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
469         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
470
471         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472
473         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
474                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
475
476         doReturn(Futures.failed(new TestException())).when(mockActorContext).
477                 executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
478
479         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
480                 eq(actorSelection(actorRef)), eqDataExists());
481
482         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
483                 READ_WRITE);
484
485         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
486
487         transactionProxy.delete(TestModel.TEST_PATH);
488
489         try {
490             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
491         } finally {
492             verify(mockActorContext, times(0)).executeOperationAsync(
493                     eq(actorSelection(actorRef)), eqDataExists());
494         }
495     }
496
497     @Test
498     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
499         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
500
501         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
502
503         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
504                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
505
506         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
507                 eq(actorSelection(actorRef)), eqDataExists());
508
509         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
510                 READ_WRITE);
511
512         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
513
514         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
515
516         assertEquals("Exists response", true, exists);
517     }
518
519     @Test(expected=IllegalStateException.class)
520     public void testxistsPreConditionCheck() {
521
522         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
523                 WRITE_ONLY);
524
525         transactionProxy.exists(TestModel.TEST_PATH);
526     }
527
528     private void verifyRecordingOperationFutures(List<Future<Object>> futures,
529             Class<?>... expResultTypes) throws Exception {
530         assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
531
532         int i = 0;
533         for( Future<Object> future: futures) {
534             assertNotNull("Recording operation Future is null", future);
535
536             Class<?> expResultType = expResultTypes[i++];
537             if(Throwable.class.isAssignableFrom(expResultType)) {
538                 try {
539                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
540                     fail("Expected exception from recording operation Future");
541                 } catch(Exception e) {
542                     // Expected
543                 }
544             } else {
545                 assertEquals("Recording operation Future result type", expResultType,
546                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
547             }
548         }
549     }
550
551     @Test
552     public void testWrite() throws Exception {
553         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
554
555         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
556
557         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
558                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
559
560         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
561                 WRITE_ONLY);
562
563         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
564
565         verify(mockActorContext).executeOperationAsync(
566                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
567
568         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
569                 WriteDataReply.SERIALIZABLE_CLASS);
570     }
571
572     @Test(expected=IllegalStateException.class)
573     public void testWritePreConditionCheck() {
574
575         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
576                 READ_ONLY);
577
578         transactionProxy.write(TestModel.TEST_PATH,
579                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
580     }
581
582     @Test(expected=IllegalStateException.class)
583     public void testWriteAfterReadyPreConditionCheck() {
584
585         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
586                 WRITE_ONLY);
587
588         transactionProxy.ready();
589
590         transactionProxy.write(TestModel.TEST_PATH,
591                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
592     }
593
594     @Test
595     public void testMerge() throws Exception {
596         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
597
598         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
599
600         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
601                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
602
603         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
604                 WRITE_ONLY);
605
606         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
607
608         verify(mockActorContext).executeOperationAsync(
609                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
610
611         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
612                 MergeDataReply.SERIALIZABLE_CLASS);
613     }
614
615     @Test
616     public void testDelete() throws Exception {
617         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
618
619         doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
620                 eq(actorSelection(actorRef)), eqDeleteData());
621
622         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
623                 WRITE_ONLY);
624
625         transactionProxy.delete(TestModel.TEST_PATH);
626
627         verify(mockActorContext).executeOperationAsync(
628                 eq(actorSelection(actorRef)), eqDeleteData());
629
630         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
631                 DeleteDataReply.SERIALIZABLE_CLASS);
632     }
633
634     private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
635             Object... expReplies) throws Exception {
636         assertEquals("getReadyOperationFutures size", expReplies.length,
637                 proxy.getCohortPathFutures().size());
638
639         int i = 0;
640         for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
641             assertNotNull("Ready operation Future is null", future);
642
643             Object expReply = expReplies[i++];
644             if(expReply instanceof ActorPath) {
645                 ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
646                 assertEquals("Cohort actor path", expReply, actual);
647             } else {
648                 // Expecting exception.
649                 try {
650                     Await.result(future, Duration.create(5, TimeUnit.SECONDS));
651                     fail("Expected exception from ready operation Future");
652                 } catch(Exception e) {
653                     // Expected
654                 }
655             }
656         }
657     }
658
659     @SuppressWarnings("unchecked")
660     @Test
661     public void testReady() throws Exception {
662         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
663
664         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
665
666         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
667                 eq(actorSelection(actorRef)), eqReadData());
668
669         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
670                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
671
672         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
673                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
674
675         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
676                 READ_WRITE);
677
678         transactionProxy.read(TestModel.TEST_PATH);
679
680         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
681
682         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
683
684         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
685
686         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
687
688         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
689                 WriteDataReply.SERIALIZABLE_CLASS);
690
691         verifyCohortPathFutures(proxy, actorRef.path());
692     }
693
694     @SuppressWarnings("unchecked")
695     @Test
696     public void testReadyWithRecordingOperationFailure() throws Exception {
697         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
698
699         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
700
701         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
702                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
703
704         doReturn(Futures.failed(new TestException())).when(mockActorContext).
705                 executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
706
707         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
708                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
709
710         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
711                 WRITE_ONLY);
712
713         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
714
715         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
716
717         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
718
719         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
720
721         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
722
723         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
724                 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
725
726         verifyCohortPathFutures(proxy, TestException.class);
727     }
728
729     @SuppressWarnings("unchecked")
730     @Test
731     public void testReadyWithReplyFailure() throws Exception {
732         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
733
734         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
735
736         doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
737                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
738
739         doReturn(Futures.failed(new TestException())).when(mockActorContext).
740                 executeOperationAsync(eq(actorSelection(actorRef)),
741                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
742
743         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
744                 WRITE_ONLY);
745
746         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
747
748         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
749
750         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
751
752         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
753
754         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
755                 MergeDataReply.SERIALIZABLE_CLASS);
756
757         verifyCohortPathFutures(proxy, TestException.class);
758     }
759
760     @Test
761     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
762
763         doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
764 //        doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
765 //                anyString(), any());
766
767         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
768                 WRITE_ONLY);
769
770         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
771
772         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
773
774         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
775
776         transactionProxy.delete(TestModel.TEST_PATH);
777
778         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
779
780         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
781
782         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
783
784         verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
785     }
786
787     @SuppressWarnings("unchecked")
788     @Test
789     public void testReadyWithInvalidReplyMessageType() throws Exception {
790         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
791
792         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
793
794         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
795                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
796
797         doReturn(Futures.successful(new Object())).when(mockActorContext).
798                 executeOperationAsync(eq(actorSelection(actorRef)),
799                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
800
801         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
802                 WRITE_ONLY);
803
804         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
805
806         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
807
808         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
809
810         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
811
812         verifyCohortPathFutures(proxy, IllegalArgumentException.class);
813     }
814
815     @Test
816     public void testGetIdentifier() {
817         setupActorContextWithInitialCreateTransaction(READ_ONLY);
818         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
819                 TransactionProxy.TransactionType.READ_ONLY);
820
821         Object id = transactionProxy.getIdentifier();
822         assertNotNull("getIdentifier returned null", id);
823         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
824     }
825
826     @SuppressWarnings("unchecked")
827     @Test
828     public void testClose() throws Exception{
829         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
830
831         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
832                 eq(actorSelection(actorRef)), eqReadData());
833
834         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
835                 READ_WRITE);
836
837         transactionProxy.read(TestModel.TEST_PATH);
838
839         transactionProxy.close();
840
841         verify(mockActorContext).sendOperationAsync(
842                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
843     }
844 }