Merge "Be sure to shutdown instance when destroyed"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8
9 import com.google.common.util.concurrent.ListeningExecutorService;
10 import com.google.common.util.concurrent.MoreExecutors;
11
12 import org.junit.BeforeClass;
13 import org.junit.Test;
14 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
19 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
21 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
29 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
30 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
32 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
33 import org.opendaylight.controller.cluster.datastore.modification.Modification;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
37 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41
42 import scala.concurrent.duration.Duration;
43
44 import java.util.Collections;
45 import java.util.concurrent.TimeUnit;
46
47 import static org.junit.Assert.assertEquals;
48 import static org.junit.Assert.assertTrue;
49
50 public class ShardTransactionTest extends AbstractActorTest {
51     private static ListeningExecutorService storeExecutor =
52         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
53
54     private static final InMemoryDOMDataStore store =
55         new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
56
57     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
58
59     private static final ShardIdentifier SHARD_IDENTIFIER =
60         ShardIdentifier.builder().memberName("member-1")
61             .shardName("inventory").type("config").build();
62
63     private DatastoreContext datastoreContext = new DatastoreContext();
64
65     @BeforeClass
66     public static void staticSetup() {
67         store.onGlobalContextUpdated(testSchemaContext);
68     }
69
70     @Test
71     public void testOnReceiveReadData() throws Exception {
72         new JavaTestKit(getSystem()) {{
73             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
74                     Collections.EMPTY_MAP, new DatastoreContext()));
75             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
76                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
77             final ActorRef subject = getSystem().actorOf(props, "testReadData");
78
79             new Within(duration("1 seconds")) {
80                 @Override
81                 protected void run() {
82
83                     subject.tell(
84                         new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
85                         getRef());
86
87                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
88                         // do not put code outside this method, will run afterwards
89                         @Override
90                         protected String match(Object in) {
91                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
92                               if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
93                                   .getNormalizedNode()!= null) {
94                                     return "match";
95                                 }
96                                 return null;
97                             } else {
98                                 throw noMatch();
99                             }
100                         }
101                     }.get(); // this extracts the received message
102
103                     assertEquals("match", out);
104
105                     expectNoMsg();
106                 }
107
108
109             };
110         }};
111     }
112
113     @Test
114     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
115         new JavaTestKit(getSystem()) {{
116             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
117                     Collections.EMPTY_MAP, new DatastoreContext()));
118             final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
119                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
120             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
121
122             new Within(duration("1 seconds")) {
123                 @Override
124                 protected void run() {
125
126                     subject.tell(
127                         new ReadData(TestModel.TEST_PATH).toSerializable(),
128                         getRef());
129
130                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
131                         // do not put code outside this method, will run afterwards
132                         @Override
133                         protected String match(Object in) {
134                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
135                                 if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
136                                     .getNormalizedNode()
137                                     == null) {
138                                     return "match";
139                                 }
140                                 return null;
141                             } else {
142                                 throw noMatch();
143                             }
144                         }
145                     }.get(); // this extracts the received message
146
147                     assertEquals("match", out);
148
149                     expectNoMsg();
150                 }
151
152
153             };
154         }};
155     }
156
157     @Test
158     public void testOnReceiveDataExistsPositive() throws Exception {
159         new JavaTestKit(getSystem()) {{
160             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
161                     Collections.EMPTY_MAP, new DatastoreContext()));
162             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
163                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
164             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
165
166             new Within(duration("1 seconds")) {
167                 @Override
168                 protected void run() {
169
170                     subject.tell(
171                         new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
172                         getRef());
173
174                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
175                         // do not put code outside this method, will run afterwards
176                         @Override
177                         protected String match(Object in) {
178                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
179                                 if (DataExistsReply.fromSerializable(in)
180                                     .exists()) {
181                                     return "match";
182                                 }
183                                 return null;
184                             } else {
185                                 throw noMatch();
186                             }
187                         }
188                     }.get(); // this extracts the received message
189
190                     assertEquals("match", out);
191
192                     expectNoMsg();
193                 }
194
195
196             };
197         }};
198     }
199
200     @Test
201     public void testOnReceiveDataExistsNegative() throws Exception {
202         new JavaTestKit(getSystem()) {{
203             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
204                     Collections.EMPTY_MAP, new DatastoreContext()));
205             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
206                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
207             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
208
209             new Within(duration("1 seconds")) {
210                 @Override
211                 protected void run() {
212
213                     subject.tell(
214                         new DataExists(TestModel.TEST_PATH).toSerializable(),
215                         getRef());
216
217                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
218                         // do not put code outside this method, will run afterwards
219                         @Override
220                         protected String match(Object in) {
221                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
222                                 if (!DataExistsReply.fromSerializable(in)
223                                     .exists()) {
224                                     return "match";
225                                 }
226                                 return null;
227                             } else {
228                                 throw noMatch();
229                             }
230                         }
231                     }.get(); // this extracts the received message
232
233                     assertEquals("match", out);
234
235                     expectNoMsg();
236                 }
237
238
239             };
240         }};
241     }
242
243     private void assertModification(final ActorRef subject,
244         final Class<? extends Modification> modificationType) {
245         new JavaTestKit(getSystem()) {{
246             new Within(duration("1 seconds")) {
247                 @Override
248                 protected void run() {
249                     subject
250                         .tell(new ShardTransaction.GetCompositedModification(),
251                             getRef());
252
253                     final CompositeModification compositeModification =
254                         new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
255                             // do not put code outside this method, will run afterwards
256                             @Override
257                             protected CompositeModification match(Object in) {
258                                 if (in instanceof ShardTransaction.GetCompositeModificationReply) {
259                                     return ((ShardTransaction.GetCompositeModificationReply) in)
260                                         .getModification();
261                                 } else {
262                                     throw noMatch();
263                                 }
264                             }
265                         }.get(); // this extracts the received message
266
267                     assertTrue(
268                         compositeModification.getModifications().size() == 1);
269                     assertEquals(modificationType,
270                         compositeModification.getModifications().get(0)
271                             .getClass());
272
273                 }
274             };
275         }};
276     }
277
278     @Test
279     public void testOnReceiveWriteData() throws Exception {
280         new JavaTestKit(getSystem()) {{
281             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
282                     Collections.EMPTY_MAP, new DatastoreContext()));
283             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
284                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
285             final ActorRef subject =
286                 getSystem().actorOf(props, "testWriteData");
287
288             new Within(duration("1 seconds")) {
289                 @Override
290                 protected void run() {
291
292                     subject.tell(new WriteData(TestModel.TEST_PATH,
293                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
294                         getRef());
295
296                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
297                         // do not put code outside this method, will run afterwards
298                         @Override
299                         protected String match(Object in) {
300                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
301                                 return "match";
302                             } else {
303                                 throw noMatch();
304                             }
305                         }
306                     }.get(); // this extracts the received message
307
308                     assertEquals("match", out);
309
310                     assertModification(subject, WriteModification.class);
311                     expectNoMsg();
312                 }
313
314
315             };
316         }};
317     }
318
319     @Test
320     public void testOnReceiveMergeData() throws Exception {
321         new JavaTestKit(getSystem()) {{
322             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
323                     Collections.EMPTY_MAP, new DatastoreContext()));
324             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
325                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
326             final ActorRef subject =
327                 getSystem().actorOf(props, "testMergeData");
328
329             new Within(duration("1 seconds")) {
330                 @Override
331                 protected void run() {
332
333                     subject.tell(new MergeData(TestModel.TEST_PATH,
334                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
335                         getRef());
336
337                     final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
338                         // do not put code outside this method, will run afterwards
339                         @Override
340                         protected String match(Object in) {
341                             if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
342                                 return "match";
343                             } else {
344                                 throw noMatch();
345                             }
346                         }
347                     }.get(); // this extracts the received message
348
349                     assertEquals("match", out);
350
351                     assertModification(subject, MergeModification.class);
352
353                     expectNoMsg();
354                 }
355
356
357             };
358         }};
359     }
360
361     @Test
362     public void testOnReceiveDeleteData() throws Exception {
363         new JavaTestKit(getSystem()) {{
364             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
365                     Collections.EMPTY_MAP, new DatastoreContext()));
366             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
367                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
368             final ActorRef subject =
369                 getSystem().actorOf(props, "testDeleteData");
370
371             new Within(duration("1 seconds")) {
372                 @Override
373                 protected void run() {
374
375                     subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
376
377                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
378                         // do not put code outside this method, will run afterwards
379                         @Override
380                         protected String match(Object in) {
381                             if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
382                                 return "match";
383                             } else {
384                                 throw noMatch();
385                             }
386                         }
387                     }.get(); // this extracts the received message
388
389                     assertEquals("match", out);
390
391                     assertModification(subject, DeleteModification.class);
392                     expectNoMsg();
393                 }
394
395
396             };
397         }};
398     }
399
400
401     @Test
402     public void testOnReceiveReadyTransaction() throws Exception {
403         new JavaTestKit(getSystem()) {{
404             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
405                     Collections.EMPTY_MAP, new DatastoreContext()));
406             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
407                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
408             final ActorRef subject =
409                 getSystem().actorOf(props, "testReadyTransaction");
410
411             new Within(duration("1 seconds")) {
412                 @Override
413                 protected void run() {
414
415                     subject.tell(new ReadyTransaction().toSerializable(), getRef());
416
417                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
418                         // do not put code outside this method, will run afterwards
419                         @Override
420                         protected String match(Object in) {
421                             if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
422                                 return "match";
423                             } else {
424                                 throw noMatch();
425                             }
426                         }
427                     }.get(); // this extracts the received message
428
429                     assertEquals("match", out);
430
431                     expectNoMsg();
432                 }
433
434
435             };
436         }};
437
438     }
439
440     @Test
441     public void testOnReceiveCloseTransaction() throws Exception {
442         new JavaTestKit(getSystem()) {{
443             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
444                     Collections.EMPTY_MAP, new DatastoreContext()));
445             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
446                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
447             final ActorRef subject =
448                 getSystem().actorOf(props, "testCloseTransaction");
449
450             watch(subject);
451
452             new Within(duration("6 seconds")) {
453                 @Override
454                 protected void run() {
455
456                     subject.tell(new CloseTransaction().toSerializable(), getRef());
457
458                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
459                         // do not put code outside this method, will run afterwards
460                         @Override
461                         protected String match(Object in) {
462                             System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
463                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
464                                 return "match";
465                             } else {
466                                 throw noMatch();
467                             }
468                         }
469                     }.get(); // this extracts the received message
470
471                     assertEquals("match", out);
472
473                     final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
474                         // do not put code outside this method, will run afterwards
475                         @Override
476                         protected String match(Object in) {
477                             System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
478                             if (in instanceof Terminated) {
479                                 return "match";
480                             } else {
481                                 throw noMatch();
482                             }
483                         }
484                     }.get(); // this extracts the received message
485
486                     assertEquals("match", termination);
487                 }
488             };
489         }};
490     }
491
492     @Test(expected=UnknownMessageException.class)
493     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
494         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
495                 Collections.EMPTY_MAP, new DatastoreContext()));
496         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
497                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
498         final TestActorRef subject = TestActorRef.apply(props,getSystem());
499
500         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
501     }
502
503     @Test
504     public void testShardTransactionInactivity() {
505
506         datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
507                 Duration.create(500, TimeUnit.MILLISECONDS));
508
509         new JavaTestKit(getSystem()) {{
510             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
511                     Collections.EMPTY_MAP, new DatastoreContext()));
512             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
513                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
514             final ActorRef subject =
515                 getSystem().actorOf(props, "testShardTransactionInactivity");
516
517             watch(subject);
518
519             // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
520
521             final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
522                 // do not put code outside this method, will run afterwards
523                 @Override
524                 protected String match(Object in) {
525                     if (in instanceof Terminated) {
526                         return "match";
527                     } else {
528                         throw noMatch();
529                     }
530                 }
531             }.get(); // this extracts the received message
532
533             assertEquals("match", termination);
534         }};
535     }
536 }