Implement creating and applying of snapshot for a shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8
9 import com.google.common.util.concurrent.ListeningExecutorService;
10 import com.google.common.util.concurrent.MoreExecutors;
11
12 import org.junit.BeforeClass;
13 import org.junit.Test;
14 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
19 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
21 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
29 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
30 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
32 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
33 import org.opendaylight.controller.cluster.datastore.modification.Modification;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
37 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41
42 import scala.concurrent.duration.Duration;
43
44 import java.util.Collections;
45 import java.util.concurrent.TimeUnit;
46
47 import static org.junit.Assert.assertEquals;
48 import static org.junit.Assert.assertTrue;
49
50 public class ShardTransactionTest extends AbstractActorTest {
51     private static ListeningExecutorService storeExecutor =
52         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
53
54     private static final InMemoryDOMDataStore store =
55         new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
56
57     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
58
59     private static final ShardIdentifier SHARD_IDENTIFIER =
60         ShardIdentifier.builder().memberName("member-1")
61             .shardName("inventory").type("config").build();
62
63     private DatastoreContext datastoreContext = new DatastoreContext();
64
65     @BeforeClass
66     public static void staticSetup() {
67         store.onGlobalContextUpdated(testSchemaContext);
68     }
69
70     private ActorRef createShard(){
71         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
72             Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
73     }
74
75     @Test
76     public void testOnReceiveReadData() throws Exception {
77         new JavaTestKit(getSystem()) {{
78             final ActorRef shard = createShard();
79             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
80                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
81             final ActorRef subject = getSystem().actorOf(props, "testReadData");
82
83             new Within(duration("1 seconds")) {
84                 @Override
85                 protected void run() {
86
87                     subject.tell(
88                         new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
89                         getRef());
90
91                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
92                         // do not put code outside this method, will run afterwards
93                         @Override
94                         protected String match(Object in) {
95                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
96                               if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
97                                   .getNormalizedNode()!= null) {
98                                     return "match";
99                                 }
100                                 return null;
101                             } else {
102                                 throw noMatch();
103                             }
104                         }
105                     }.get(); // this extracts the received message
106
107                     assertEquals("match", out);
108
109                     expectNoMsg();
110                 }
111
112
113             };
114         }};
115     }
116
117     @Test
118     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
119         new JavaTestKit(getSystem()) {{
120             final ActorRef shard = createShard();
121             final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
122                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
123             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
124
125             new Within(duration("1 seconds")) {
126                 @Override
127                 protected void run() {
128
129                     subject.tell(
130                         new ReadData(TestModel.TEST_PATH).toSerializable(),
131                         getRef());
132
133                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
134                         // do not put code outside this method, will run afterwards
135                         @Override
136                         protected String match(Object in) {
137                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
138                                 if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
139                                     .getNormalizedNode()
140                                     == null) {
141                                     return "match";
142                                 }
143                                 return null;
144                             } else {
145                                 throw noMatch();
146                             }
147                         }
148                     }.get(); // this extracts the received message
149
150                     assertEquals("match", out);
151
152                     expectNoMsg();
153                 }
154
155
156             };
157         }};
158     }
159
160     @Test
161     public void testOnReceiveDataExistsPositive() throws Exception {
162         new JavaTestKit(getSystem()) {{
163             final ActorRef shard = createShard();
164             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
165                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
166             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
167
168             new Within(duration("1 seconds")) {
169                 @Override
170                 protected void run() {
171
172                     subject.tell(
173                         new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
174                         getRef());
175
176                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
177                         // do not put code outside this method, will run afterwards
178                         @Override
179                         protected String match(Object in) {
180                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
181                                 if (DataExistsReply.fromSerializable(in)
182                                     .exists()) {
183                                     return "match";
184                                 }
185                                 return null;
186                             } else {
187                                 throw noMatch();
188                             }
189                         }
190                     }.get(); // this extracts the received message
191
192                     assertEquals("match", out);
193
194                     expectNoMsg();
195                 }
196
197
198             };
199         }};
200     }
201
202     @Test
203     public void testOnReceiveDataExistsNegative() throws Exception {
204         new JavaTestKit(getSystem()) {{
205             final ActorRef shard = createShard();
206             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
207                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
208             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
209
210             new Within(duration("1 seconds")) {
211                 @Override
212                 protected void run() {
213
214                     subject.tell(
215                         new DataExists(TestModel.TEST_PATH).toSerializable(),
216                         getRef());
217
218                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
219                         // do not put code outside this method, will run afterwards
220                         @Override
221                         protected String match(Object in) {
222                             if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
223                                 if (!DataExistsReply.fromSerializable(in)
224                                     .exists()) {
225                                     return "match";
226                                 }
227                                 return null;
228                             } else {
229                                 throw noMatch();
230                             }
231                         }
232                     }.get(); // this extracts the received message
233
234                     assertEquals("match", out);
235
236                     expectNoMsg();
237                 }
238
239
240             };
241         }};
242     }
243
244     private void assertModification(final ActorRef subject,
245         final Class<? extends Modification> modificationType) {
246         new JavaTestKit(getSystem()) {{
247             new Within(duration("1 seconds")) {
248                 @Override
249                 protected void run() {
250                     subject
251                         .tell(new ShardTransaction.GetCompositedModification(),
252                             getRef());
253
254                     final CompositeModification compositeModification =
255                         new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
256                             // do not put code outside this method, will run afterwards
257                             @Override
258                             protected CompositeModification match(Object in) {
259                                 if (in instanceof ShardTransaction.GetCompositeModificationReply) {
260                                     return ((ShardTransaction.GetCompositeModificationReply) in)
261                                         .getModification();
262                                 } else {
263                                     throw noMatch();
264                                 }
265                             }
266                         }.get(); // this extracts the received message
267
268                     assertTrue(
269                         compositeModification.getModifications().size() == 1);
270                     assertEquals(modificationType,
271                         compositeModification.getModifications().get(0)
272                             .getClass());
273
274                 }
275             };
276         }};
277     }
278
279     @Test
280     public void testOnReceiveWriteData() throws Exception {
281         new JavaTestKit(getSystem()) {{
282             final ActorRef shard = createShard();
283             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
284                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
285             final ActorRef subject =
286                 getSystem().actorOf(props, "testWriteData");
287
288             new Within(duration("1 seconds")) {
289                 @Override
290                 protected void run() {
291
292                     subject.tell(new WriteData(TestModel.TEST_PATH,
293                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
294                         getRef());
295
296                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
297                         // do not put code outside this method, will run afterwards
298                         @Override
299                         protected String match(Object in) {
300                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
301                                 return "match";
302                             } else {
303                                 throw noMatch();
304                             }
305                         }
306                     }.get(); // this extracts the received message
307
308                     assertEquals("match", out);
309
310                     assertModification(subject, WriteModification.class);
311                     expectNoMsg();
312                 }
313
314
315             };
316         }};
317     }
318
319     @Test
320     public void testOnReceiveMergeData() throws Exception {
321         new JavaTestKit(getSystem()) {{
322             final ActorRef shard = createShard();
323             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
324                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
325             final ActorRef subject =
326                 getSystem().actorOf(props, "testMergeData");
327
328             new Within(duration("1 seconds")) {
329                 @Override
330                 protected void run() {
331
332                     subject.tell(new MergeData(TestModel.TEST_PATH,
333                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
334                         getRef());
335
336                     final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
337                         // do not put code outside this method, will run afterwards
338                         @Override
339                         protected String match(Object in) {
340                             if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
341                                 return "match";
342                             } else {
343                                 throw noMatch();
344                             }
345                         }
346                     }.get(); // this extracts the received message
347
348                     assertEquals("match", out);
349
350                     assertModification(subject, MergeModification.class);
351
352                     expectNoMsg();
353                 }
354
355
356             };
357         }};
358     }
359
360     @Test
361     public void testOnReceiveDeleteData() throws Exception {
362         new JavaTestKit(getSystem()) {{
363             final ActorRef shard = createShard();
364             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
365                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
366             final ActorRef subject =
367                 getSystem().actorOf(props, "testDeleteData");
368
369             new Within(duration("1 seconds")) {
370                 @Override
371                 protected void run() {
372
373                     subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
374
375                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
376                         // do not put code outside this method, will run afterwards
377                         @Override
378                         protected String match(Object in) {
379                             if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
380                                 return "match";
381                             } else {
382                                 throw noMatch();
383                             }
384                         }
385                     }.get(); // this extracts the received message
386
387                     assertEquals("match", out);
388
389                     assertModification(subject, DeleteModification.class);
390                     expectNoMsg();
391                 }
392
393
394             };
395         }};
396     }
397
398
399     @Test
400     public void testOnReceiveReadyTransaction() throws Exception {
401         new JavaTestKit(getSystem()) {{
402             final ActorRef shard = createShard();
403             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
404                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
405             final ActorRef subject =
406                 getSystem().actorOf(props, "testReadyTransaction");
407
408             new Within(duration("1 seconds")) {
409                 @Override
410                 protected void run() {
411
412                     subject.tell(new ReadyTransaction().toSerializable(), getRef());
413
414                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
415                         // do not put code outside this method, will run afterwards
416                         @Override
417                         protected String match(Object in) {
418                             if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
419                                 return "match";
420                             } else {
421                                 throw noMatch();
422                             }
423                         }
424                     }.get(); // this extracts the received message
425
426                     assertEquals("match", out);
427
428                     expectNoMsg();
429                 }
430
431
432             };
433         }};
434
435     }
436
437     @Test
438     public void testOnReceiveCloseTransaction() throws Exception {
439         new JavaTestKit(getSystem()) {{
440             final ActorRef shard = createShard();
441             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
442                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
443             final ActorRef subject =
444                 getSystem().actorOf(props, "testCloseTransaction");
445
446             watch(subject);
447
448             new Within(duration("6 seconds")) {
449                 @Override
450                 protected void run() {
451
452                     subject.tell(new CloseTransaction().toSerializable(), getRef());
453
454                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
455                         // do not put code outside this method, will run afterwards
456                         @Override
457                         protected String match(Object in) {
458                             System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
459                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
460                                 return "match";
461                             } else {
462                                 throw noMatch();
463                             }
464                         }
465                     }.get(); // this extracts the received message
466
467                     assertEquals("match", out);
468
469                     final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
470                         // do not put code outside this method, will run afterwards
471                         @Override
472                         protected String match(Object in) {
473                             System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
474                             if (in instanceof Terminated) {
475                                 return "match";
476                             } else {
477                                 throw noMatch();
478                             }
479                         }
480                     }.get(); // this extracts the received message
481
482                     assertEquals("match", termination);
483                 }
484             };
485         }};
486     }
487
488     @Test(expected=UnknownMessageException.class)
489     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
490         final ActorRef shard = createShard();
491         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
492                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
493         final TestActorRef subject = TestActorRef.apply(props,getSystem());
494
495         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
496     }
497
498     @Test
499     public void testShardTransactionInactivity() {
500
501         datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
502                 Duration.create(500, TimeUnit.MILLISECONDS));
503
504         new JavaTestKit(getSystem()) {{
505             final ActorRef shard = createShard();
506             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
507                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
508             final ActorRef subject =
509                 getSystem().actorOf(props, "testShardTransactionInactivity");
510
511             watch(subject);
512
513             // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
514
515             final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
516                 // do not put code outside this method, will run afterwards
517                 @Override
518                 protected String match(Object in) {
519                     if (in instanceof Terminated) {
520                         return "match";
521                     } else {
522                         throw noMatch();
523                     }
524                 }
525             }.get(); // this extracts the received message
526
527             assertEquals("match", termination);
528         }};
529     }
530 }