Bug 1446: Add JMX stats for clustered data store
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8
9 import com.google.common.util.concurrent.ListeningExecutorService;
10 import com.google.common.util.concurrent.MoreExecutors;
11
12 import org.junit.BeforeClass;
13 import org.junit.Test;
14 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
19 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
20 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
21 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
22 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
24 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
30 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
31 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
32 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
33 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
34 import org.opendaylight.controller.cluster.datastore.modification.Modification;
35 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
36 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
37 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
38 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42
43 import scala.concurrent.duration.Duration;
44
45 import java.util.Collections;
46 import java.util.concurrent.TimeUnit;
47
48 import static org.junit.Assert.assertEquals;
49 import static org.junit.Assert.assertTrue;
50
51 public class ShardTransactionTest extends AbstractActorTest {
52     private static ListeningExecutorService storeExecutor =
53         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
54
55     private static final InMemoryDOMDataStore store =
56         new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
57
58     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
59
60     private static final ShardIdentifier SHARD_IDENTIFIER =
61         ShardIdentifier.builder().memberName("member-1")
62             .shardName("inventory").type("config").build();
63
64     private DatastoreContext datastoreContext = new DatastoreContext();
65
66     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
67
68     @BeforeClass
69     public static void staticSetup() {
70         store.onGlobalContextUpdated(testSchemaContext);
71     }
72
73     @Test
74     public void testOnReceiveReadData() throws Exception {
75         new JavaTestKit(getSystem()) {{
76             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
77                     Collections.EMPTY_MAP, new DatastoreContext()));
78             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
79                     testSchemaContext, datastoreContext, shardStats);
80             final ActorRef subject = getSystem().actorOf(props, "testReadData");
81
82             new Within(duration("1 seconds")) {
83                 @Override
84                 protected void run() {
85
86                     subject.tell(
87                         new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
88                         getRef());
89
90                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
91                         // do not put code outside this method, will run afterwards
92                         @Override
93                         protected String match(Object in) {
94                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
95                               if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
96                                   .getNormalizedNode()!= null) {
97                                     return "match";
98                                 }
99                                 return null;
100                             } else {
101                                 throw noMatch();
102                             }
103                         }
104                     }.get(); // this extracts the received message
105
106                     assertEquals("match", out);
107
108                     expectNoMsg();
109                 }
110
111
112             };
113         }};
114     }
115
116     @Test
117     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
118         new JavaTestKit(getSystem()) {{
119             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
120                     Collections.EMPTY_MAP, new DatastoreContext()));
121             final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
122                     testSchemaContext, datastoreContext, shardStats);
123             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
124
125             new Within(duration("1 seconds")) {
126                 @Override
127                 protected void run() {
128
129                     subject.tell(
130                         new ReadData(TestModel.TEST_PATH).toSerializable(),
131                         getRef());
132
133                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
134                         // do not put code outside this method, will run afterwards
135                         @Override
136                         protected String match(Object in) {
137                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
138                                 if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
139                                     .getNormalizedNode()
140                                     == null) {
141                                     return "match";
142                                 }
143                                 return null;
144                             } else {
145                                 throw noMatch();
146                             }
147                         }
148                     }.get(); // this extracts the received message
149
150                     assertEquals("match", out);
151
152                     expectNoMsg();
153                 }
154
155
156             };
157         }};
158     }
159
160     @Test
161     public void testOnReceiveDataExistsPositive() throws Exception {
162         new JavaTestKit(getSystem()) {{
163             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
164                     Collections.EMPTY_MAP, new DatastoreContext()));
165             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
166                     testSchemaContext, datastoreContext, shardStats);
167             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
168
169             new Within(duration("1 seconds")) {
170                 @Override
171                 protected void run() {
172
173                     subject.tell(
174                         new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
175                         getRef());
176
177                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
178                         // do not put code outside this method, will run afterwards
179                         @Override
180                         protected String match(Object in) {
181                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
182                                 if (DataExistsReply.fromSerializable(in)
183                                     .exists()) {
184                                     return "match";
185                                 }
186                                 return null;
187                             } else {
188                                 throw noMatch();
189                             }
190                         }
191                     }.get(); // this extracts the received message
192
193                     assertEquals("match", out);
194
195                     expectNoMsg();
196                 }
197
198
199             };
200         }};
201     }
202
203     @Test
204     public void testOnReceiveDataExistsNegative() throws Exception {
205         new JavaTestKit(getSystem()) {{
206             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
207                     Collections.EMPTY_MAP, new DatastoreContext()));
208             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
209                     testSchemaContext, datastoreContext, shardStats);
210             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
211
212             new Within(duration("1 seconds")) {
213                 @Override
214                 protected void run() {
215
216                     subject.tell(
217                         new DataExists(TestModel.TEST_PATH).toSerializable(),
218                         getRef());
219
220                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
221                         // do not put code outside this method, will run afterwards
222                         @Override
223                         protected String match(Object in) {
224                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
225                                 if (!DataExistsReply.fromSerializable(in)
226                                     .exists()) {
227                                     return "match";
228                                 }
229                                 return null;
230                             } else {
231                                 throw noMatch();
232                             }
233                         }
234                     }.get(); // this extracts the received message
235
236                     assertEquals("match", out);
237
238                     expectNoMsg();
239                 }
240
241
242             };
243         }};
244     }
245
246     private void assertModification(final ActorRef subject,
247         final Class<? extends Modification> modificationType) {
248         new JavaTestKit(getSystem()) {{
249             new Within(duration("1 seconds")) {
250                 @Override
251                 protected void run() {
252                     subject
253                         .tell(new ShardTransaction.GetCompositedModification(),
254                             getRef());
255
256                     final CompositeModification compositeModification =
257                         new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
258                             // do not put code outside this method, will run afterwards
259                             @Override
260                             protected CompositeModification match(Object in) {
261                                 if (in instanceof ShardTransaction.GetCompositeModificationReply) {
262                                     return ((ShardTransaction.GetCompositeModificationReply) in)
263                                         .getModification();
264                                 } else {
265                                     throw noMatch();
266                                 }
267                             }
268                         }.get(); // this extracts the received message
269
270                     assertTrue(
271                         compositeModification.getModifications().size() == 1);
272                     assertEquals(modificationType,
273                         compositeModification.getModifications().get(0)
274                             .getClass());
275
276                 }
277             };
278         }};
279     }
280
281     @Test
282     public void testOnReceiveWriteData() throws Exception {
283         new JavaTestKit(getSystem()) {{
284             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
285                     Collections.EMPTY_MAP, new DatastoreContext()));
286             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
287                     testSchemaContext, datastoreContext, shardStats);
288             final ActorRef subject =
289                 getSystem().actorOf(props, "testWriteData");
290
291             new Within(duration("1 seconds")) {
292                 @Override
293                 protected void run() {
294
295                     subject.tell(new WriteData(TestModel.TEST_PATH,
296                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
297                         getRef());
298
299                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
300                         // do not put code outside this method, will run afterwards
301                         @Override
302                         protected String match(Object in) {
303                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
304                                 return "match";
305                             } else {
306                                 throw noMatch();
307                             }
308                         }
309                     }.get(); // this extracts the received message
310
311                     assertEquals("match", out);
312
313                     assertModification(subject, WriteModification.class);
314                     expectNoMsg();
315                 }
316
317
318             };
319         }};
320     }
321
322     @Test
323     public void testOnReceiveMergeData() throws Exception {
324         new JavaTestKit(getSystem()) {{
325             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
326                     Collections.EMPTY_MAP, new DatastoreContext()));
327             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
328                     testSchemaContext, datastoreContext, shardStats);
329             final ActorRef subject =
330                 getSystem().actorOf(props, "testMergeData");
331
332             new Within(duration("1 seconds")) {
333                 @Override
334                 protected void run() {
335
336                     subject.tell(new MergeData(TestModel.TEST_PATH,
337                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
338                         getRef());
339
340                     final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
341                         // do not put code outside this method, will run afterwards
342                         @Override
343                         protected String match(Object in) {
344                             if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
345                                 return "match";
346                             } else {
347                                 throw noMatch();
348                             }
349                         }
350                     }.get(); // this extracts the received message
351
352                     assertEquals("match", out);
353
354                     assertModification(subject, MergeModification.class);
355
356                     expectNoMsg();
357                 }
358
359
360             };
361         }};
362     }
363
364     @Test
365     public void testOnReceiveDeleteData() throws Exception {
366         new JavaTestKit(getSystem()) {{
367             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
368                     Collections.EMPTY_MAP, new DatastoreContext()));
369             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
370                     testSchemaContext, datastoreContext, shardStats);
371             final ActorRef subject =
372                 getSystem().actorOf(props, "testDeleteData");
373
374             new Within(duration("1 seconds")) {
375                 @Override
376                 protected void run() {
377
378                     subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
379
380                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
381                         // do not put code outside this method, will run afterwards
382                         @Override
383                         protected String match(Object in) {
384                             if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
385                                 return "match";
386                             } else {
387                                 throw noMatch();
388                             }
389                         }
390                     }.get(); // this extracts the received message
391
392                     assertEquals("match", out);
393
394                     assertModification(subject, DeleteModification.class);
395                     expectNoMsg();
396                 }
397
398
399             };
400         }};
401     }
402
403
404     @Test
405     public void testOnReceiveReadyTransaction() throws Exception {
406         new JavaTestKit(getSystem()) {{
407             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
408                     Collections.EMPTY_MAP, new DatastoreContext()));
409             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
410                     testSchemaContext, datastoreContext, shardStats);
411             final ActorRef subject =
412                 getSystem().actorOf(props, "testReadyTransaction");
413
414             new Within(duration("1 seconds")) {
415                 @Override
416                 protected void run() {
417
418                     subject.tell(new ReadyTransaction().toSerializable(), getRef());
419
420                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
421                         // do not put code outside this method, will run afterwards
422                         @Override
423                         protected String match(Object in) {
424                             if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
425                                 return "match";
426                             } else {
427                                 throw noMatch();
428                             }
429                         }
430                     }.get(); // this extracts the received message
431
432                     assertEquals("match", out);
433
434                     expectNoMsg();
435                 }
436
437
438             };
439         }};
440
441     }
442
443     @Test
444     public void testOnReceiveCloseTransaction() throws Exception {
445         new JavaTestKit(getSystem()) {{
446             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
447                     Collections.EMPTY_MAP, new DatastoreContext()));
448             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
449                     testSchemaContext, datastoreContext, shardStats);
450             final ActorRef subject =
451                 getSystem().actorOf(props, "testCloseTransaction");
452
453             watch(subject);
454
455             new Within(duration("6 seconds")) {
456                 @Override
457                 protected void run() {
458
459                     subject.tell(new CloseTransaction().toSerializable(), getRef());
460
461                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
462                         // do not put code outside this method, will run afterwards
463                         @Override
464                         protected String match(Object in) {
465                             System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
466                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
467                                 return "match";
468                             } else {
469                                 throw noMatch();
470                             }
471                         }
472                     }.get(); // this extracts the received message
473
474                     assertEquals("match", out);
475
476                     final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
477                         // do not put code outside this method, will run afterwards
478                         @Override
479                         protected String match(Object in) {
480                             System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
481                             if (in instanceof Terminated) {
482                                 return "match";
483                             } else {
484                                 throw noMatch();
485                             }
486                         }
487                     }.get(); // this extracts the received message
488
489                     assertEquals("match", termination);
490                 }
491             };
492         }};
493     }
494
495     @Test(expected=UnknownMessageException.class)
496     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
497         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
498                 Collections.EMPTY_MAP, new DatastoreContext()));
499         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
500                 testSchemaContext, datastoreContext, shardStats);
501         final TestActorRef subject = TestActorRef.apply(props,getSystem());
502
503         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
504     }
505
506     @Test
507     public void testShardTransactionInactivity() {
508
509         datastoreContext = new DatastoreContext("Test",
510                 InMemoryDOMDataStoreConfigProperties.getDefault(),
511                 Duration.create(500, TimeUnit.MILLISECONDS), 5);
512
513         new JavaTestKit(getSystem()) {{
514             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
515                     Collections.EMPTY_MAP, new DatastoreContext()));
516             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
517                     testSchemaContext, datastoreContext, shardStats);
518             final ActorRef subject =
519                 getSystem().actorOf(props, "testShardTransactionInactivity");
520
521             watch(subject);
522
523             // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
524
525             final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
526                 // do not put code outside this method, will run afterwards
527                 @Override
528                 protected String match(Object in) {
529                     if (in instanceof Terminated) {
530                         return "match";
531                     } else {
532                         throw noMatch();
533                     }
534                 }
535             }.get(); // this extracts the received message
536
537             assertEquals("match", termination);
538         }};
539     }
540 }