Merge "Bug 1025: Fixed incorrect revision in sal-remote-augment, which caused log...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8
9 import com.google.common.util.concurrent.ListeningExecutorService;
10 import com.google.common.util.concurrent.MoreExecutors;
11
12 import org.junit.BeforeClass;
13 import org.junit.Test;
14 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
19 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
20 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
21 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
22 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
24 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
30 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
31 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
32 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
33 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
34 import org.opendaylight.controller.cluster.datastore.modification.Modification;
35 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
36 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
37 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
38 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42
43 import scala.concurrent.duration.Duration;
44
45 import java.util.Collections;
46 import java.util.concurrent.TimeUnit;
47
48 import static org.junit.Assert.assertEquals;
49 import static org.junit.Assert.assertTrue;
50
51 public class ShardTransactionTest extends AbstractActorTest {
52     private static ListeningExecutorService storeExecutor =
53         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
54
55     private static final InMemoryDOMDataStore store =
56         new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
57
58     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
59
60     private static final ShardIdentifier SHARD_IDENTIFIER =
61         ShardIdentifier.builder().memberName("member-1")
62             .shardName("inventory").type("config").build();
63
64     private DatastoreContext datastoreContext = new DatastoreContext();
65
66     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
67
68     @BeforeClass
69     public static void staticSetup() {
70         store.onGlobalContextUpdated(testSchemaContext);
71     }
72
73     private ActorRef createShard(){
74         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
75             Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
76     }
77
78     @Test
79     public void testOnReceiveReadData() throws Exception {
80         new JavaTestKit(getSystem()) {{
81             final ActorRef shard = createShard();
82             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
83                     testSchemaContext, datastoreContext, shardStats);
84             final ActorRef subject = getSystem().actorOf(props, "testReadData");
85
86             new Within(duration("1 seconds")) {
87                 @Override
88                 protected void run() {
89
90                     subject.tell(
91                         new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
92                         getRef());
93
94                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
95                         // do not put code outside this method, will run afterwards
96                         @Override
97                         protected String match(Object in) {
98                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
99                               if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
100                                   .getNormalizedNode()!= null) {
101                                     return "match";
102                                 }
103                                 return null;
104                             } else {
105                                 throw noMatch();
106                             }
107                         }
108                     }.get(); // this extracts the received message
109
110                     assertEquals("match", out);
111
112                     expectNoMsg();
113                 }
114
115
116             };
117         }};
118     }
119
120     @Test
121     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
122         new JavaTestKit(getSystem()) {{
123             final ActorRef shard = createShard();
124             final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
125                     testSchemaContext, datastoreContext, shardStats);
126             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
127
128             new Within(duration("1 seconds")) {
129                 @Override
130                 protected void run() {
131
132                     subject.tell(
133                         new ReadData(TestModel.TEST_PATH).toSerializable(),
134                         getRef());
135
136                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
137                         // do not put code outside this method, will run afterwards
138                         @Override
139                         protected String match(Object in) {
140                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
141                                 if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
142                                     .getNormalizedNode()
143                                     == null) {
144                                     return "match";
145                                 }
146                                 return null;
147                             } else {
148                                 throw noMatch();
149                             }
150                         }
151                     }.get(); // this extracts the received message
152
153                     assertEquals("match", out);
154
155                     expectNoMsg();
156                 }
157
158
159             };
160         }};
161     }
162
163     @Test
164     public void testOnReceiveDataExistsPositive() throws Exception {
165         new JavaTestKit(getSystem()) {{
166             final ActorRef shard = createShard();
167             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
168                     testSchemaContext, datastoreContext, shardStats);
169             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
170
171             new Within(duration("1 seconds")) {
172                 @Override
173                 protected void run() {
174
175                     subject.tell(
176                         new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
177                         getRef());
178
179                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
180                         // do not put code outside this method, will run afterwards
181                         @Override
182                         protected String match(Object in) {
183                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
184                                 if (DataExistsReply.fromSerializable(in)
185                                     .exists()) {
186                                     return "match";
187                                 }
188                                 return null;
189                             } else {
190                                 throw noMatch();
191                             }
192                         }
193                     }.get(); // this extracts the received message
194
195                     assertEquals("match", out);
196
197                     expectNoMsg();
198                 }
199
200
201             };
202         }};
203     }
204
205     @Test
206     public void testOnReceiveDataExistsNegative() throws Exception {
207         new JavaTestKit(getSystem()) {{
208             final ActorRef shard = createShard();
209             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
210                     testSchemaContext, datastoreContext, shardStats);
211             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
212
213             new Within(duration("1 seconds")) {
214                 @Override
215                 protected void run() {
216
217                     subject.tell(
218                         new DataExists(TestModel.TEST_PATH).toSerializable(),
219                         getRef());
220
221                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
222                         // do not put code outside this method, will run afterwards
223                         @Override
224                         protected String match(Object in) {
225                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
226                                 if (!DataExistsReply.fromSerializable(in)
227                                     .exists()) {
228                                     return "match";
229                                 }
230                                 return null;
231                             } else {
232                                 throw noMatch();
233                             }
234                         }
235                     }.get(); // this extracts the received message
236
237                     assertEquals("match", out);
238
239                     expectNoMsg();
240                 }
241
242
243             };
244         }};
245     }
246
247     private void assertModification(final ActorRef subject,
248         final Class<? extends Modification> modificationType) {
249         new JavaTestKit(getSystem()) {{
250             new Within(duration("1 seconds")) {
251                 @Override
252                 protected void run() {
253                     subject
254                         .tell(new ShardTransaction.GetCompositedModification(),
255                             getRef());
256
257                     final CompositeModification compositeModification =
258                         new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
259                             // do not put code outside this method, will run afterwards
260                             @Override
261                             protected CompositeModification match(Object in) {
262                                 if (in instanceof ShardTransaction.GetCompositeModificationReply) {
263                                     return ((ShardTransaction.GetCompositeModificationReply) in)
264                                         .getModification();
265                                 } else {
266                                     throw noMatch();
267                                 }
268                             }
269                         }.get(); // this extracts the received message
270
271                     assertTrue(
272                         compositeModification.getModifications().size() == 1);
273                     assertEquals(modificationType,
274                         compositeModification.getModifications().get(0)
275                             .getClass());
276
277                 }
278             };
279         }};
280     }
281
282     @Test
283     public void testOnReceiveWriteData() throws Exception {
284         new JavaTestKit(getSystem()) {{
285             final ActorRef shard = createShard();
286             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
287                     testSchemaContext, datastoreContext, shardStats);
288             final ActorRef subject =
289                 getSystem().actorOf(props, "testWriteData");
290
291             new Within(duration("1 seconds")) {
292                 @Override
293                 protected void run() {
294
295                     subject.tell(new WriteData(TestModel.TEST_PATH,
296                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
297                         getRef());
298
299                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
300                         // do not put code outside this method, will run afterwards
301                         @Override
302                         protected String match(Object in) {
303                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
304                                 return "match";
305                             } else {
306                                 throw noMatch();
307                             }
308                         }
309                     }.get(); // this extracts the received message
310
311                     assertEquals("match", out);
312
313                     assertModification(subject, WriteModification.class);
314                     expectNoMsg();
315                 }
316
317
318             };
319         }};
320     }
321
322     @Test
323     public void testOnReceiveMergeData() throws Exception {
324         new JavaTestKit(getSystem()) {{
325             final ActorRef shard = createShard();
326             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
327                     testSchemaContext, datastoreContext, shardStats);
328             final ActorRef subject =
329                 getSystem().actorOf(props, "testMergeData");
330
331             new Within(duration("1 seconds")) {
332                 @Override
333                 protected void run() {
334
335                     subject.tell(new MergeData(TestModel.TEST_PATH,
336                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
337                         getRef());
338
339                     final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
340                         // do not put code outside this method, will run afterwards
341                         @Override
342                         protected String match(Object in) {
343                             if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
344                                 return "match";
345                             } else {
346                                 throw noMatch();
347                             }
348                         }
349                     }.get(); // this extracts the received message
350
351                     assertEquals("match", out);
352
353                     assertModification(subject, MergeModification.class);
354
355                     expectNoMsg();
356                 }
357
358
359             };
360         }};
361     }
362
363     @Test
364     public void testOnReceiveDeleteData() throws Exception {
365         new JavaTestKit(getSystem()) {{
366             final ActorRef shard = createShard();
367             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
368                     testSchemaContext, datastoreContext, shardStats);
369             final ActorRef subject =
370                 getSystem().actorOf(props, "testDeleteData");
371
372             new Within(duration("1 seconds")) {
373                 @Override
374                 protected void run() {
375
376                     subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
377
378                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
379                         // do not put code outside this method, will run afterwards
380                         @Override
381                         protected String match(Object in) {
382                             if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
383                                 return "match";
384                             } else {
385                                 throw noMatch();
386                             }
387                         }
388                     }.get(); // this extracts the received message
389
390                     assertEquals("match", out);
391
392                     assertModification(subject, DeleteModification.class);
393                     expectNoMsg();
394                 }
395
396
397             };
398         }};
399     }
400
401
402     @Test
403     public void testOnReceiveReadyTransaction() throws Exception {
404         new JavaTestKit(getSystem()) {{
405             final ActorRef shard = createShard();
406             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
407                     testSchemaContext, datastoreContext, shardStats);
408             final ActorRef subject =
409                 getSystem().actorOf(props, "testReadyTransaction");
410
411             new Within(duration("1 seconds")) {
412                 @Override
413                 protected void run() {
414
415                     subject.tell(new ReadyTransaction().toSerializable(), getRef());
416
417                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
418                         // do not put code outside this method, will run afterwards
419                         @Override
420                         protected String match(Object in) {
421                             if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
422                                 return "match";
423                             } else {
424                                 throw noMatch();
425                             }
426                         }
427                     }.get(); // this extracts the received message
428
429                     assertEquals("match", out);
430
431                     expectNoMsg();
432                 }
433
434
435             };
436         }};
437
438     }
439
440     @Test
441     public void testOnReceiveCloseTransaction() throws Exception {
442         new JavaTestKit(getSystem()) {{
443             final ActorRef shard = createShard();
444             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
445                     testSchemaContext, datastoreContext, shardStats);
446             final ActorRef subject =
447                 getSystem().actorOf(props, "testCloseTransaction");
448
449             watch(subject);
450
451             new Within(duration("6 seconds")) {
452                 @Override
453                 protected void run() {
454
455                     subject.tell(new CloseTransaction().toSerializable(), getRef());
456
457                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
458                         // do not put code outside this method, will run afterwards
459                         @Override
460                         protected String match(Object in) {
461                             System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
462                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
463                                 return "match";
464                             } else {
465                                 throw noMatch();
466                             }
467                         }
468                     }.get(); // this extracts the received message
469
470                     assertEquals("match", out);
471
472                     final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
473                         // do not put code outside this method, will run afterwards
474                         @Override
475                         protected String match(Object in) {
476                             System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
477                             if (in instanceof Terminated) {
478                                 return "match";
479                             } else {
480                                 throw noMatch();
481                             }
482                         }
483                     }.get(); // this extracts the received message
484
485                     assertEquals("match", termination);
486                 }
487             };
488         }};
489     }
490
491     @Test(expected=UnknownMessageException.class)
492     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
493         final ActorRef shard = createShard();
494         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
495                 testSchemaContext, datastoreContext, shardStats);
496         final TestActorRef subject = TestActorRef.apply(props,getSystem());
497
498         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
499     }
500
501     @Test
502     public void testShardTransactionInactivity() {
503
504         datastoreContext = new DatastoreContext("Test",
505                 InMemoryDOMDataStoreConfigProperties.getDefault(),
506                 Duration.create(500, TimeUnit.MILLISECONDS), 5, 1000, 1000, 500);
507
508         new JavaTestKit(getSystem()) {{
509             final ActorRef shard = createShard();
510             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
511                     testSchemaContext, datastoreContext, shardStats);
512             final ActorRef subject =
513                 getSystem().actorOf(props, "testShardTransactionInactivity");
514
515             watch(subject);
516
517             // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
518
519             final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
520                 // do not put code outside this method, will run afterwards
521                 @Override
522                 protected String match(Object in) {
523                     if (in instanceof Terminated) {
524                         return "match";
525                     } else {
526                         throw noMatch();
527                     }
528                 }
529             }.get(); // this extracts the received message
530
531             assertEquals("match", termination);
532         }};
533     }
534 }