Merge "Fixed namespace of yang testing file."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.ListenableFuture;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
10
11 import junit.framework.Assert;
12
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
20 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
21 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
22 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
25 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
28 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
29 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
30 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
32 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
33 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
37 import scala.concurrent.duration.FiniteDuration;
38
39 import java.util.List;
40 import java.util.concurrent.Executors;
41
42 import static junit.framework.Assert.fail;
43 import static org.mockito.Matchers.any;
44 import static org.mockito.Matchers.anyString;
45 import static org.mockito.Mockito.mock;
46 import static org.mockito.Mockito.when;
47
48 public class TransactionProxyTest extends AbstractActorTest {
49
50     private final Configuration configuration = new MockConfiguration();
51
52     private final ActorContext testContext =
53         new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
54
55     private final ListeningExecutorService transactionExecutor =
56         MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
57
58     @Before
59     public void setUp(){
60         ShardStrategyFactory.setConfiguration(configuration);
61     }
62
63     @After
64     public void tearDown() {
65         transactionExecutor.shutdownNow();
66     }
67
68     @Test
69     public void testRead() throws Exception {
70         final Props props = Props.create(DoNothingActor.class);
71         final ActorRef actorRef = getSystem().actorOf(props);
72
73         final MockActorContext actorContext = new MockActorContext(this.getSystem());
74         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
75         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
76         actorContext.setExecuteRemoteOperationResponse("message");
77
78
79         TransactionProxy transactionProxy =
80             new TransactionProxy(actorContext,
81                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
82
83
84         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
85             transactionProxy.read(TestModel.TEST_PATH);
86
87         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
88
89         Assert.assertFalse(normalizedNodeOptional.isPresent());
90
91         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
92             TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
93
94         read = transactionProxy.read(TestModel.TEST_PATH);
95
96         normalizedNodeOptional = read.get();
97
98         Assert.assertTrue(normalizedNodeOptional.isPresent());
99     }
100
101     @Test
102     public void testReadWhenANullIsReturned() throws Exception {
103         final Props props = Props.create(DoNothingActor.class);
104         final ActorRef actorRef = getSystem().actorOf(props);
105
106         final MockActorContext actorContext = new MockActorContext(this.getSystem());
107         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
108         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
109         actorContext.setExecuteRemoteOperationResponse("message");
110
111         TransactionProxy transactionProxy =
112             new TransactionProxy(actorContext,
113                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
114
115
116         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
117             transactionProxy.read(TestModel.TEST_PATH);
118
119         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
120
121         Assert.assertFalse(normalizedNodeOptional.isPresent());
122
123         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
124            TestModel.createTestContext(), null).toSerializable());
125
126         read = transactionProxy.read(TestModel.TEST_PATH);
127
128         normalizedNodeOptional = read.get();
129
130         Assert.assertFalse(normalizedNodeOptional.isPresent());
131     }
132
133     @Test
134     public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
135         final ActorContext actorContext = mock(ActorContext.class);
136
137         when(actorContext.executeShardOperation(anyString(), any(), any(
138             FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test"));
139
140         TransactionProxy transactionProxy =
141             new TransactionProxy(actorContext,
142                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
143
144
145         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
146             transactionProxy.read(TestModel.TEST_PATH);
147
148         Assert.assertFalse(read.get().isPresent());
149
150     }
151
152
153     @Test
154     public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
155         final ActorContext actorContext = mock(ActorContext.class);
156
157         when(actorContext.executeShardOperation(anyString(), any(), any(
158             FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason")));
159
160         TransactionProxy transactionProxy =
161             new TransactionProxy(actorContext,
162                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
163
164
165         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
166             transactionProxy.read(TestModel.TEST_PATH);
167
168         Assert.assertFalse(read.get().isPresent());
169
170     }
171
172     @Test
173     public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception {
174         final ActorContext actorContext = mock(ActorContext.class);
175
176         when(actorContext.executeShardOperation(anyString(), any(), any(
177             FiniteDuration.class))).thenThrow(new NullPointerException());
178
179         TransactionProxy transactionProxy =
180             new TransactionProxy(actorContext,
181                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
182
183
184         try {
185             ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
186                 transactionProxy.read(TestModel.TEST_PATH);
187             fail("A null pointer exception was expected");
188         } catch(NullPointerException e){
189
190         }
191     }
192
193
194
195     @Test
196     public void testWrite() throws Exception {
197         final Props props = Props.create(MessageCollectorActor.class);
198         final ActorRef actorRef = getSystem().actorOf(props);
199
200         final MockActorContext actorContext = new MockActorContext(this.getSystem());
201         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
202         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
203         actorContext.setExecuteRemoteOperationResponse("message");
204
205         TransactionProxy transactionProxy =
206             new TransactionProxy(actorContext,
207                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
208
209         transactionProxy.write(TestModel.TEST_PATH,
210             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
211
212         Object messages = testContext
213             .executeLocalOperation(actorRef, "messages",
214                 ActorContext.ASK_DURATION);
215
216         Assert.assertNotNull(messages);
217
218         Assert.assertTrue(messages instanceof List);
219
220         List<Object> listMessages = (List<Object>) messages;
221
222         Assert.assertEquals(1, listMessages.size());
223
224         Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
225     }
226
227     private Object createPrimaryFound(ActorRef actorRef) {
228         return new PrimaryFound(actorRef.path().toString()).toSerializable();
229     }
230
231     @Test
232     public void testMerge() throws Exception {
233         final Props props = Props.create(MessageCollectorActor.class);
234         final ActorRef actorRef = getSystem().actorOf(props);
235
236         final MockActorContext actorContext = new MockActorContext(this.getSystem());
237         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
238         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
239         actorContext.setExecuteRemoteOperationResponse("message");
240
241         TransactionProxy transactionProxy =
242             new TransactionProxy(actorContext,
243                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
244
245         transactionProxy.merge(TestModel.TEST_PATH,
246             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
247
248         Object messages = testContext
249             .executeLocalOperation(actorRef, "messages",
250                 ActorContext.ASK_DURATION);
251
252         Assert.assertNotNull(messages);
253
254         Assert.assertTrue(messages instanceof List);
255
256         List<Object> listMessages = (List<Object>) messages;
257
258         Assert.assertEquals(1, listMessages.size());
259
260         Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
261     }
262
263     @Test
264     public void testDelete() throws Exception {
265         final Props props = Props.create(MessageCollectorActor.class);
266         final ActorRef actorRef = getSystem().actorOf(props);
267
268         final MockActorContext actorContext = new MockActorContext(this.getSystem());
269         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
270         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
271         actorContext.setExecuteRemoteOperationResponse("message");
272
273         TransactionProxy transactionProxy =
274             new TransactionProxy(actorContext,
275                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
276
277         transactionProxy.delete(TestModel.TEST_PATH);
278
279         Object messages = testContext
280             .executeLocalOperation(actorRef, "messages",
281                 ActorContext.ASK_DURATION);
282
283         Assert.assertNotNull(messages);
284
285         Assert.assertTrue(messages instanceof List);
286
287         List<Object> listMessages = (List<Object>) messages;
288
289         Assert.assertEquals(1, listMessages.size());
290
291         Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
292     }
293
294     @Test
295     public void testReady() throws Exception {
296         final Props props = Props.create(DoNothingActor.class);
297         final ActorRef doNothingActorRef = getSystem().actorOf(props);
298
299         final MockActorContext actorContext = new MockActorContext(this.getSystem());
300         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
301         actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
302         actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
303
304         TransactionProxy transactionProxy =
305             new TransactionProxy(actorContext,
306                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
307
308
309         transactionProxy.read(TestModel.TEST_PATH);
310
311         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
312
313         Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
314
315         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
316
317         Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
318
319     }
320
321     @Test
322     public void testGetIdentifier(){
323         final Props props = Props.create(DoNothingActor.class);
324         final ActorRef doNothingActorRef = getSystem().actorOf(props);
325
326         final MockActorContext actorContext = new MockActorContext(this.getSystem());
327         actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
328
329         TransactionProxy transactionProxy =
330             new TransactionProxy(actorContext,
331                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
332
333         Assert.assertNotNull(transactionProxy.getIdentifier());
334     }
335
336     @Test
337     public void testClose(){
338         final Props props = Props.create(MessageCollectorActor.class);
339         final ActorRef actorRef = getSystem().actorOf(props);
340
341         final MockActorContext actorContext = new MockActorContext(this.getSystem());
342         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
343         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
344         actorContext.setExecuteRemoteOperationResponse("message");
345
346         TransactionProxy transactionProxy =
347             new TransactionProxy(actorContext,
348                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
349
350         transactionProxy.read(TestModel.TEST_PATH);
351
352         transactionProxy.close();
353
354         Object messages = testContext
355             .executeLocalOperation(actorRef, "messages",
356                 ActorContext.ASK_DURATION);
357
358         Assert.assertNotNull(messages);
359
360         Assert.assertTrue(messages instanceof List);
361
362         List<Object> listMessages = (List<Object>) messages;
363
364         Assert.assertEquals(1, listMessages.size());
365
366         Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
367     }
368
369     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
370         return CreateTransactionReply.newBuilder()
371             .setTransactionActorPath(actorRef.path().toString())
372             .setTransactionId("txn-1")
373             .build();
374     }
375 }