Merge "BUG 932 - Swagger HTTP POST contains incorrect object"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.fail;
7 import akka.actor.ActorPath;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.Props;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Optional;
13
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.mockito.ArgumentMatcher;
17 import org.mockito.Mock;
18 import org.mockito.MockitoAnnotations;
19
20 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
21 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
22 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
23
24 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
25 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
26 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
42 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
43 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
44 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
45 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
46 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
50
51 import scala.concurrent.Future;
52 import scala.concurrent.duration.FiniteDuration;
53
54 import java.util.Arrays;
55 import java.util.concurrent.TimeUnit;
56
57 import static org.mockito.Matchers.any;
58 import static org.mockito.Matchers.anyString;
59 import static org.mockito.Mockito.doReturn;
60 import static org.mockito.Mockito.doThrow;
61 import static org.mockito.Mockito.argThat;
62 import static org.mockito.Mockito.eq;
63 import static org.mockito.Mockito.verify;
64 import static org.mockito.Mockito.isA;
65
66 @SuppressWarnings("resource")
67 public class TransactionProxyTest extends AbstractActorTest {
68
69     @SuppressWarnings("serial")
70     static class TestException extends RuntimeException {
71     }
72
73     static interface Invoker {
74         void invoke(TransactionProxy proxy) throws Exception;
75     }
76
77     private final Configuration configuration = new MockConfiguration();
78
79     @Mock
80     private ActorContext mockActorContext;
81
82     private SchemaContext schemaContext;
83
84     String memberName = "mock-member";
85
86     @Before
87     public void setUp(){
88         MockitoAnnotations.initMocks(this);
89
90         schemaContext = TestModel.createTestContext();
91
92         doReturn(getSystem()).when(mockActorContext).getActorSystem();
93
94         ShardStrategyFactory.setConfiguration(configuration);
95     }
96
97     private CreateTransaction eqCreateTransaction(final String memberName,
98             final TransactionType type) {
99         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
100             @Override
101             public boolean matches(Object argument) {
102                 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
103                 return obj.getTransactionId().startsWith(memberName) &&
104                        obj.getTransactionType() == type.ordinal();
105             }
106         };
107
108         return argThat(matcher);
109     }
110
111     private DataExists eqDataExists() {
112         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
113             @Override
114             public boolean matches(Object argument) {
115                 DataExists obj = DataExists.fromSerializable(argument);
116                 return obj.getPath().equals(TestModel.TEST_PATH);
117             }
118         };
119
120         return argThat(matcher);
121     }
122
123     private ReadData eqReadData() {
124         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
125             @Override
126             public boolean matches(Object argument) {
127                 ReadData obj = ReadData.fromSerializable(argument);
128                 return obj.getPath().equals(TestModel.TEST_PATH);
129             }
130         };
131
132         return argThat(matcher);
133     }
134
135     private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
136         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
137             @Override
138             public boolean matches(Object argument) {
139                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
140                 return obj.getPath().equals(TestModel.TEST_PATH) &&
141                        obj.getData().equals(nodeToWrite);
142             }
143         };
144
145         return argThat(matcher);
146     }
147
148     private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
149         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
150             @Override
151             public boolean matches(Object argument) {
152                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
153                 return obj.getPath().equals(TestModel.TEST_PATH) &&
154                        obj.getData().equals(nodeToWrite);
155             }
156         };
157
158         return argThat(matcher);
159     }
160
161     private DeleteData eqDeleteData() {
162         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
163             @Override
164             public boolean matches(Object argument) {
165                 DeleteData obj = DeleteData.fromSerializable(argument);
166                 return obj.getPath().equals(TestModel.TEST_PATH);
167             }
168         };
169
170         return argThat(matcher);
171     }
172
173     private Object readyTxReply(ActorPath path) {
174         return new ReadyTransactionReply(path).toSerializable();
175     }
176
177     private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
178         return Futures.successful(new ReadDataReply(schemaContext, data)
179                 .toSerializable());
180     }
181
182     private Future<Object> dataExistsReply(boolean exists) {
183         return Futures.successful(new DataExistsReply(exists).toSerializable());
184     }
185
186     private ActorSelection actorSelection(ActorRef actorRef) {
187         return getSystem().actorSelection(actorRef.path());
188     }
189
190     private FiniteDuration anyDuration() {
191         return any(FiniteDuration.class);
192     }
193
194     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
195         return CreateTransactionReply.newBuilder()
196             .setTransactionActorPath(actorRef.path().toString())
197             .setTransactionId("txn-1").build();
198     }
199
200     private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
201         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
202         doReturn(getSystem().actorSelection(actorRef.path())).
203                 when(mockActorContext).actorSelection(actorRef.path().toString());
204         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
205         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
206                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
207                         eqCreateTransaction(memberName, type), anyDuration());
208         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
209                 anyString(), eq(actorRef.path().toString()));
210         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
211
212         return actorRef;
213     }
214
215     @Test
216     public void testRead() throws Exception {
217         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
218
219         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
220                 READ_ONLY, schemaContext);
221
222         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
223                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
224
225         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
226                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
227
228         assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
229
230         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
231
232         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
233                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
234
235         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
236
237         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
238
239         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
240     }
241
242     @Test(expected = ReadFailedException.class)
243     public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
244         setupActorContextWithInitialCreateTransaction(READ_ONLY);
245
246         doReturn(Futures.successful(new Object())).when(mockActorContext).
247                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
248
249         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
250                 READ_ONLY, schemaContext);
251
252         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
253     }
254
255     @Test(expected = TestException.class)
256     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
257         setupActorContextWithInitialCreateTransaction(READ_ONLY);
258
259         doThrow(new TestException()).when(mockActorContext).
260                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
261
262         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
263                 READ_ONLY, schemaContext);
264
265         try {
266             transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
267             fail("Expected ReadFailedException");
268         } catch(ReadFailedException e) {
269             // Expected - throw cause - expects TestException.
270             throw e.getCause();
271         }
272     }
273
274     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
275             throws Throwable {
276
277         doThrow(exToThrow).when(mockActorContext).executeShardOperation(
278                 anyString(), any(), anyDuration());
279
280         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
281                 READ_ONLY, schemaContext);
282
283         try {
284             invoker.invoke(transactionProxy);
285             fail("Expected ReadFailedException");
286         } catch(ReadFailedException e) {
287             // Expected - throw cause - expects TestException.
288             throw e.getCause();
289         }
290     }
291
292     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
293         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
294             @Override
295             public void invoke(TransactionProxy proxy) throws Exception {
296                 proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
297             }
298         });
299     }
300
301     @Test(expected = PrimaryNotFoundException.class)
302     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
303         testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
304     }
305
306     @Test(expected = TimeoutException.class)
307     public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
308         testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
309                 new Exception("reason")));
310     }
311
312     @Test(expected = TestException.class)
313     public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
314         testReadWithExceptionOnInitialCreateTransaction(new TestException());
315     }
316
317     @Test
318     public void testExists() throws Exception {
319         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
320
321         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
322                 READ_ONLY, schemaContext);
323
324         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
325                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
326
327         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
328
329         assertEquals("Exists response", false, exists);
330
331         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
332                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
333
334         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
335
336         assertEquals("Exists response", true, exists);
337     }
338
339     @Test(expected = PrimaryNotFoundException.class)
340     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
341         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
342             @Override
343             public void invoke(TransactionProxy proxy) throws Exception {
344                 proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
345             }
346         });
347     }
348
349     @Test(expected = ReadFailedException.class)
350     public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
351         setupActorContextWithInitialCreateTransaction(READ_ONLY);
352
353         doReturn(Futures.successful(new Object())).when(mockActorContext).
354                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
355
356         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
357                 READ_ONLY, schemaContext);
358
359         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
360     }
361
362     @Test(expected = TestException.class)
363     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
364         setupActorContextWithInitialCreateTransaction(READ_ONLY);
365
366         doThrow(new TestException()).when(mockActorContext).
367                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
368
369         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
370                 READ_ONLY, schemaContext);
371
372         try {
373             transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
374             fail("Expected ReadFailedException");
375         } catch(ReadFailedException e) {
376             // Expected - throw cause - expects TestException.
377             throw e.getCause();
378         }
379     }
380
381     @Test
382     public void testWrite() throws Exception {
383         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
384
385         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
386                 WRITE_ONLY, schemaContext);
387
388         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
389
390         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
391
392         verify(mockActorContext).sendRemoteOperationAsync(
393                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
394     }
395
396     @Test
397     public void testMerge() throws Exception {
398         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
399
400         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
401                 WRITE_ONLY, schemaContext);
402
403         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
404
405         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
406
407         verify(mockActorContext).sendRemoteOperationAsync(
408                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
409     }
410
411     @Test
412     public void testDelete() throws Exception {
413         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
414
415         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
416                 WRITE_ONLY, schemaContext);
417
418         transactionProxy.delete(TestModel.TEST_PATH);
419
420         verify(mockActorContext).sendRemoteOperationAsync(
421                 eq(actorSelection(actorRef)), eqDeleteData());
422     }
423
424     @SuppressWarnings("unchecked")
425     @Test
426     public void testReady() throws Exception {
427         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
428
429         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
430                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
431
432         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
433                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
434
435         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
436                 READ_WRITE, schemaContext);
437
438         transactionProxy.read(TestModel.TEST_PATH);
439
440         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
441
442         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
443
444         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
445
446         assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
447     }
448
449     @Test
450     public void testGetIdentifier() {
451         setupActorContextWithInitialCreateTransaction(READ_ONLY);
452         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
453                 TransactionProxy.TransactionType.READ_ONLY, schemaContext);
454
455         Object id = transactionProxy.getIdentifier();
456         assertNotNull("getIdentifier returned null", id);
457         assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
458     }
459
460     @SuppressWarnings("unchecked")
461     @Test
462     public void testClose() throws Exception{
463         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
464
465         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
466                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
467
468         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
469                 READ_WRITE, schemaContext);
470
471         transactionProxy.read(TestModel.TEST_PATH);
472
473         transactionProxy.close();
474
475         verify(mockActorContext).sendRemoteOperationAsync(
476                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
477     }
478 }