Merge "Issue fix for config subsystem"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
10 import org.junit.BeforeClass;
11 import org.junit.Test;
12 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
13 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
14 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
15 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
17 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
21 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
29 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
30 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
31 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
32 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
35 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import scala.concurrent.duration.Duration;
40 import java.util.Collections;
41 import java.util.concurrent.TimeUnit;
42 import static org.junit.Assert.assertEquals;
43 import static org.junit.Assert.assertTrue;
44
45 public class ShardTransactionTest extends AbstractActorTest {
46     private static ListeningExecutorService storeExecutor =
47         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
48
49     private static final InMemoryDOMDataStore store =
50         new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
51
52     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
53
54     private static final ShardIdentifier SHARD_IDENTIFIER =
55         ShardIdentifier.builder().memberName("member-1")
56             .shardName("inventory").type("config").build();
57
58     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
59
60     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
61
62     @BeforeClass
63     public static void staticSetup() {
64         store.onGlobalContextUpdated(testSchemaContext);
65     }
66
67     private ActorRef createShard(){
68         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
69             Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
70     }
71
72     @Test
73     public void testOnReceiveReadData() throws Exception {
74         new JavaTestKit(getSystem()) {{
75             final ActorRef shard = createShard();
76             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
77                     testSchemaContext, datastoreContext, shardStats, "txn");
78             final ActorRef subject = getSystem().actorOf(props, "testReadData");
79
80             new Within(duration("1 seconds")) {
81                 @Override
82                 protected void run() {
83
84                     subject.tell(
85                         new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
86                         getRef());
87
88                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
89                         // do not put code outside this method, will run afterwards
90                         @Override
91                         protected String match(Object in) {
92                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
93                               if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
94                                   .getNormalizedNode()!= null) {
95                                     return "match";
96                                 }
97                                 return null;
98                             } else {
99                                 throw noMatch();
100                             }
101                         }
102                     }.get(); // this extracts the received message
103
104                     assertEquals("match", out);
105
106                     expectNoMsg();
107                 }
108
109
110             };
111         }};
112     }
113
114     @Test
115     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
116         new JavaTestKit(getSystem()) {{
117             final ActorRef shard = createShard();
118             final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
119                     testSchemaContext, datastoreContext, shardStats, "txn");
120             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
121
122             new Within(duration("1 seconds")) {
123                 @Override
124                 protected void run() {
125
126                     subject.tell(
127                         new ReadData(TestModel.TEST_PATH).toSerializable(),
128                         getRef());
129
130                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
131                         // do not put code outside this method, will run afterwards
132                         @Override
133                         protected String match(Object in) {
134                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
135                                 if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
136                                     .getNormalizedNode()
137                                     == null) {
138                                     return "match";
139                                 }
140                                 return null;
141                             } else {
142                                 throw noMatch();
143                             }
144                         }
145                     }.get(); // this extracts the received message
146
147                     assertEquals("match", out);
148
149                     expectNoMsg();
150                 }
151
152
153             };
154         }};
155     }
156
157     @Test
158     public void testOnReceiveDataExistsPositive() throws Exception {
159         new JavaTestKit(getSystem()) {{
160             final ActorRef shard = createShard();
161             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
162                     testSchemaContext, datastoreContext, shardStats, "txn");
163             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
164
165             new Within(duration("1 seconds")) {
166                 @Override
167                 protected void run() {
168
169                     subject.tell(
170                         new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
171                         getRef());
172
173                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
174                         // do not put code outside this method, will run afterwards
175                         @Override
176                         protected String match(Object in) {
177                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
178                                 if (DataExistsReply.fromSerializable(in)
179                                     .exists()) {
180                                     return "match";
181                                 }
182                                 return null;
183                             } else {
184                                 throw noMatch();
185                             }
186                         }
187                     }.get(); // this extracts the received message
188
189                     assertEquals("match", out);
190
191                     expectNoMsg();
192                 }
193
194
195             };
196         }};
197     }
198
199     @Test
200     public void testOnReceiveDataExistsNegative() throws Exception {
201         new JavaTestKit(getSystem()) {{
202             final ActorRef shard = createShard();
203             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
204                     testSchemaContext, datastoreContext, shardStats, "txn");
205             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
206
207             new Within(duration("1 seconds")) {
208                 @Override
209                 protected void run() {
210
211                     subject.tell(
212                         new DataExists(TestModel.TEST_PATH).toSerializable(),
213                         getRef());
214
215                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
216                         // do not put code outside this method, will run afterwards
217                         @Override
218                         protected String match(Object in) {
219                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
220                                 if (!DataExistsReply.fromSerializable(in)
221                                     .exists()) {
222                                     return "match";
223                                 }
224                                 return null;
225                             } else {
226                                 throw noMatch();
227                             }
228                         }
229                     }.get(); // this extracts the received message
230
231                     assertEquals("match", out);
232
233                     expectNoMsg();
234                 }
235
236
237             };
238         }};
239     }
240
241     private void assertModification(final ActorRef subject,
242         final Class<? extends Modification> modificationType) {
243         new JavaTestKit(getSystem()) {{
244             new Within(duration("3 seconds")) {
245                 @Override
246                 protected void run() {
247                     subject
248                         .tell(new ShardWriteTransaction.GetCompositedModification(),
249                             getRef());
250
251                     final CompositeModification compositeModification =
252                         new ExpectMsg<CompositeModification>(duration("3 seconds"), "match hint") {
253                             // do not put code outside this method, will run afterwards
254                             @Override
255                             protected CompositeModification match(Object in) {
256                                 if (in instanceof ShardWriteTransaction.GetCompositeModificationReply) {
257                                     return ((ShardWriteTransaction.GetCompositeModificationReply) in)
258                                         .getModification();
259                                 } else {
260                                     throw noMatch();
261                                 }
262                             }
263                         }.get(); // this extracts the received message
264
265                     assertTrue(
266                         compositeModification.getModifications().size() == 1);
267                     assertEquals(modificationType,
268                         compositeModification.getModifications().get(0)
269                             .getClass());
270
271                 }
272             };
273         }};
274     }
275
276     @Test
277     public void testOnReceiveWriteData() throws Exception {
278         new JavaTestKit(getSystem()) {{
279             final ActorRef shard = createShard();
280             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
281                     testSchemaContext, datastoreContext, shardStats, "txn");
282             final ActorRef subject =
283                 getSystem().actorOf(props, "testWriteData");
284
285             new Within(duration("1 seconds")) {
286                 @Override
287                 protected void run() {
288
289                     subject.tell(new WriteData(TestModel.TEST_PATH,
290                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
291                         getRef());
292
293                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
294                         // do not put code outside this method, will run afterwards
295                         @Override
296                         protected String match(Object in) {
297                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
298                                 return "match";
299                             } else {
300                                 throw noMatch();
301                             }
302                         }
303                     }.get(); // this extracts the received message
304
305                     assertEquals("match", out);
306
307                     assertModification(subject, WriteModification.class);
308                     expectNoMsg();
309                 }
310
311
312             };
313         }};
314     }
315
316     @Test
317     public void testOnReceiveMergeData() throws Exception {
318         new JavaTestKit(getSystem()) {{
319             final ActorRef shard = createShard();
320             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
321                     testSchemaContext, datastoreContext, shardStats, "txn");
322             final ActorRef subject =
323                 getSystem().actorOf(props, "testMergeData");
324
325             new Within(duration("1 seconds")) {
326                 @Override
327                 protected void run() {
328
329                     subject.tell(new MergeData(TestModel.TEST_PATH,
330                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
331                         getRef());
332
333                     final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
334                         // do not put code outside this method, will run afterwards
335                         @Override
336                         protected String match(Object in) {
337                             if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
338                                 return "match";
339                             } else {
340                                 throw noMatch();
341                             }
342                         }
343                     }.get(); // this extracts the received message
344
345                     assertEquals("match", out);
346
347                     assertModification(subject, MergeModification.class);
348
349                     expectNoMsg();
350                 }
351
352
353             };
354         }};
355     }
356
357     @Test
358     public void testOnReceiveDeleteData() throws Exception {
359         new JavaTestKit(getSystem()) {{
360             final ActorRef shard = createShard();
361             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
362                     testSchemaContext, datastoreContext, shardStats, "txn");
363             final ActorRef subject =
364                 getSystem().actorOf(props, "testDeleteData");
365
366             new Within(duration("1 seconds")) {
367                 @Override
368                 protected void run() {
369
370                     subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
371
372                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
373                         // do not put code outside this method, will run afterwards
374                         @Override
375                         protected String match(Object in) {
376                             if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
377                                 return "match";
378                             } else {
379                                 throw noMatch();
380                             }
381                         }
382                     }.get(); // this extracts the received message
383
384                     assertEquals("match", out);
385
386                     assertModification(subject, DeleteModification.class);
387                     expectNoMsg();
388                 }
389
390
391             };
392         }};
393     }
394
395
396     @Test
397     public void testOnReceiveReadyTransaction() throws Exception {
398         new JavaTestKit(getSystem()) {{
399             final ActorRef shard = createShard();
400             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
401                     testSchemaContext, datastoreContext, shardStats, "txn");
402             final ActorRef subject =
403                 getSystem().actorOf(props, "testReadyTransaction");
404
405             new Within(duration("1 seconds")) {
406                 @Override
407                 protected void run() {
408
409                     subject.tell(new ReadyTransaction().toSerializable(), getRef());
410
411                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
412                         // do not put code outside this method, will run afterwards
413                         @Override
414                         protected String match(Object in) {
415                             if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
416                                 return "match";
417                             } else {
418                                 throw noMatch();
419                             }
420                         }
421                     }.get(); // this extracts the received message
422
423                     assertEquals("match", out);
424
425                     expectNoMsg();
426                 }
427
428
429             };
430         }};
431
432     }
433
434     @Test
435     public void testOnReceiveCloseTransaction() throws Exception {
436         new JavaTestKit(getSystem()) {{
437             final ActorRef shard = createShard();
438             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
439                     testSchemaContext, datastoreContext, shardStats, "txn");
440             final ActorRef subject =
441                 getSystem().actorOf(props, "testCloseTransaction");
442
443             watch(subject);
444
445             new Within(duration("6 seconds")) {
446                 @Override
447                 protected void run() {
448
449                     subject.tell(new CloseTransaction().toSerializable(), getRef());
450
451                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
452                         // do not put code outside this method, will run afterwards
453                         @Override
454                         protected String match(Object in) {
455                             System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
456                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
457                                 return "match";
458                             } else {
459                                 throw noMatch();
460                             }
461                         }
462                     }.get(); // this extracts the received message
463
464                     assertEquals("match", out);
465
466                     final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
467                         // do not put code outside this method, will run afterwards
468                         @Override
469                         protected String match(Object in) {
470                             System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
471                             if (in instanceof Terminated) {
472                                 return "match";
473                             } else {
474                                 throw noMatch();
475                             }
476                         }
477                     }.get(); // this extracts the received message
478
479                     assertEquals("match", termination);
480                 }
481             };
482         }};
483     }
484
485     @Test(expected=UnknownMessageException.class)
486     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
487         final ActorRef shard = createShard();
488         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
489                 testSchemaContext, datastoreContext, shardStats, "txn");
490         final TestActorRef subject = TestActorRef.apply(props,getSystem());
491
492         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
493     }
494
495     @Test
496     public void testShardTransactionInactivity() {
497
498         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
499                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
500
501         new JavaTestKit(getSystem()) {{
502             final ActorRef shard = createShard();
503             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
504                     testSchemaContext, datastoreContext, shardStats, "txn");
505             final ActorRef subject =
506                 getSystem().actorOf(props, "testShardTransactionInactivity");
507
508             watch(subject);
509
510             // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
511
512             final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
513                 // do not put code outside this method, will run afterwards
514                 @Override
515                 protected String match(Object in) {
516                     if (in instanceof Terminated) {
517                         return "match";
518                     } else {
519                         throw noMatch();
520                     }
521                 }
522             }.get(); // this extracts the received message
523
524             assertEquals("match", termination);
525         }};
526     }
527 }