BUG 1815 - Do not allow Shards to be created till an appropriate schema context is...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
7 import akka.japi.Procedure;
8 import akka.persistence.PersistentConfirmation;
9 import akka.persistence.PersistentId;
10 import akka.persistence.PersistentImpl;
11 import akka.persistence.PersistentRepr;
12 import akka.persistence.journal.japi.AsyncWriteJournal;
13 import akka.testkit.JavaTestKit;
14 import akka.testkit.TestActorRef;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import com.typesafe.config.Config;
19 import com.typesafe.config.ConfigFactory;
20 import com.typesafe.config.ConfigValueFactory;
21 import org.junit.AfterClass;
22 import org.junit.Before;
23 import org.junit.BeforeClass;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
26 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
28 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
29 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
30 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
32 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
33 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
34 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
35 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import scala.concurrent.Future;
38
39 import java.net.URI;
40 import java.util.Collection;
41 import java.util.HashMap;
42 import java.util.HashSet;
43 import java.util.Map;
44 import java.util.Set;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.TimeUnit;
47
48 import static junit.framework.Assert.assertEquals;
49 import static org.junit.Assert.assertFalse;
50 import static org.junit.Assert.assertTrue;
51 import static org.mockito.Mockito.mock;
52 import static org.mockito.Mockito.when;
53
54 public class ShardManagerTest {
55     private static ActorSystem system;
56
57     @BeforeClass
58     public static void setUpClass() {
59         Map<String, String> myJournal = new HashMap<>();
60         myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
61         myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
62         Config config = ConfigFactory.load()
63             .withValue("akka.persistence.journal.plugin",
64                 ConfigValueFactory.fromAnyRef("my-journal"))
65             .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
66
67         MyJournal.clear();
68
69         system = ActorSystem.create("test", config);
70     }
71
72     @AfterClass
73     public static void tearDown() {
74         JavaTestKit.shutdownActorSystem(system);
75         system = null;
76     }
77
78     @Before
79     public void setUpTest(){
80         MyJournal.clear();
81     }
82
83     @Test
84     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
85
86         new JavaTestKit(system) {
87             {
88                 final Props props = ShardManager
89                     .props("config", new MockClusterWrapper(),
90                         new MockConfiguration(), new DatastoreContext());
91
92                 final ActorRef subject = getSystem().actorOf(props);
93
94                 subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
95
96                 expectMsgEquals(duration("2 seconds"),
97                     new PrimaryNotFound("inventory").toSerializable());
98             }};
99     }
100
101     @Test
102     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
103
104         new JavaTestKit(system) {{
105             final Props props = ShardManager
106                 .props("config", new MockClusterWrapper(),
107                     new MockConfiguration(), new DatastoreContext());
108
109             final ActorRef subject = getSystem().actorOf(props);
110
111             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
112
113             subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
114
115             expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
116         }};
117     }
118
119     @Test
120     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
121
122         new JavaTestKit(system) {{
123             final Props props = ShardManager
124                 .props("config", new MockClusterWrapper(),
125                     new MockConfiguration(), new DatastoreContext());
126
127             final ActorRef subject = getSystem().actorOf(props);
128
129             subject.tell(new FindLocalShard("inventory"), getRef());
130
131             final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
132                 @Override
133                 protected String match(Object in) {
134                     if (in instanceof LocalShardNotFound) {
135                         return ((LocalShardNotFound) in).getShardName();
136                     } else {
137                         throw noMatch();
138                     }
139                 }
140             }.get(); // this extracts the received message
141
142             assertEquals("inventory", out);
143         }};
144     }
145
146     @Test
147     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
148
149         final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
150
151         new JavaTestKit(system) {{
152             final Props props = ShardManager
153                 .props("config", mockClusterWrapper,
154                     new MockConfiguration(), new DatastoreContext());
155
156             final ActorRef subject = getSystem().actorOf(props);
157
158             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
159
160             subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
161
162             final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
163                 @Override
164                 protected ActorRef match(Object in) {
165                     if (in instanceof LocalShardFound) {
166                         return ((LocalShardFound) in).getPath();
167                     } else {
168                         throw noMatch();
169                     }
170                 }
171             }.get(); // this extracts the received message
172
173             assertTrue(out.path().toString(),
174                 out.path().toString().contains("member-1-shard-default-config"));
175         }};
176     }
177
178     @Test
179     public void testOnReceiveMemberUp() throws Exception {
180
181         new JavaTestKit(system) {{
182             final Props props = ShardManager
183                 .props("config", new MockClusterWrapper(),
184                     new MockConfiguration(), new DatastoreContext());
185
186             final ActorRef subject = getSystem().actorOf(props);
187
188             MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
189
190             subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
191
192             final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
193                 // do not put code outside this method, will run afterwards
194                 @Override
195                 protected String match(Object in) {
196                     if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
197                         PrimaryFound f = PrimaryFound.fromSerializable(in);
198                         return f.getPrimaryPath();
199                     } else {
200                         throw noMatch();
201                     }
202                 }
203             }.get(); // this extracts the received message
204
205             assertTrue(out, out.contains("member-2-shard-astronauts-config"));
206         }};
207     }
208
209     @Test
210     public void testOnReceiveMemberDown() throws Exception {
211
212         new JavaTestKit(system) {{
213             final Props props = ShardManager
214                 .props("config", new MockClusterWrapper(),
215                     new MockConfiguration(), new DatastoreContext());
216
217             final ActorRef subject = getSystem().actorOf(props);
218
219             MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
220
221             subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
222
223             expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
224
225             MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
226
227             subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
228
229             expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
230         }};
231     }
232
233     @Test
234     public void testOnRecoveryJournalIsEmptied(){
235         MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
236             ImmutableSet.of("foo")));
237
238         assertEquals(1, MyJournal.get().size());
239
240         new JavaTestKit(system) {{
241             final Props props = ShardManager
242                 .props("config", new MockClusterWrapper(),
243                     new MockConfiguration(), new DatastoreContext());
244
245             final ActorRef subject = getSystem().actorOf(props);
246
247             // Send message to check that ShardManager is ready
248             subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
249
250             expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
251
252             assertEquals(0, MyJournal.get().size());
253         }};
254     }
255
256     @Test
257     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
258         new JavaTestKit(system) {{
259             final Props props = ShardManager
260                 .props("config", new MockClusterWrapper(),
261                     new MockConfiguration(), new DatastoreContext());
262             final TestActorRef<ShardManager> subject =
263                 TestActorRef.create(system, props);
264
265             subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
266
267             Collection<String> knownModules = subject.underlyingActor().getKnownModules();
268
269             assertTrue(knownModules.contains("foo"));
270         }};
271     }
272
273     @Test
274     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
275         throws Exception {
276         new JavaTestKit(system) {{
277             final Props props = ShardManager
278                 .props("config", new MockClusterWrapper(),
279                     new MockConfiguration(), new DatastoreContext());
280             final TestActorRef<ShardManager> subject =
281                 TestActorRef.create(system, props);
282
283             Collection<String> knownModules = subject.underlyingActor().getKnownModules();
284
285             assertEquals(0, knownModules.size());
286
287             SchemaContext schemaContext = mock(SchemaContext.class);
288             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
289
290             ModuleIdentifier foo = mock(ModuleIdentifier.class);
291             when(foo.getNamespace()).thenReturn(new URI("foo"));
292
293             moduleIdentifierSet.add(foo);
294
295             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
296
297             subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
298
299             assertTrue(knownModules.contains("foo"));
300
301             assertEquals(1, knownModules.size());
302
303             ModuleIdentifier bar = mock(ModuleIdentifier.class);
304             when(bar.getNamespace()).thenReturn(new URI("bar"));
305
306             moduleIdentifierSet.add(bar);
307
308             subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
309
310             assertTrue(knownModules.contains("bar"));
311
312             assertEquals(2, knownModules.size());
313
314         }};
315
316     }
317
318
319     @Test
320     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
321         throws Exception {
322         new JavaTestKit(system) {{
323             final Props props = ShardManager
324                 .props("config", new MockClusterWrapper(),
325                     new MockConfiguration(), new DatastoreContext());
326             final TestActorRef<ShardManager> subject =
327                 TestActorRef.create(system, props);
328
329             Collection<String> knownModules = subject.underlyingActor().getKnownModules();
330
331             assertEquals(0, knownModules.size());
332
333             SchemaContext schemaContext = mock(SchemaContext.class);
334             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
335
336             ModuleIdentifier foo = mock(ModuleIdentifier.class);
337             when(foo.getNamespace()).thenReturn(new URI("foo"));
338
339             moduleIdentifierSet.add(foo);
340
341             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
342
343             subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
344
345             assertTrue(knownModules.contains("foo"));
346
347             assertEquals(1, knownModules.size());
348
349             //Create a completely different SchemaContext with only the bar module in it
350             schemaContext = mock(SchemaContext.class);
351             moduleIdentifierSet = new HashSet<>();
352             ModuleIdentifier bar = mock(ModuleIdentifier.class);
353             when(bar.getNamespace()).thenReturn(new URI("bar"));
354
355             moduleIdentifierSet.add(bar);
356
357             subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
358
359             assertFalse(knownModules.contains("bar"));
360
361             assertEquals(1, knownModules.size());
362
363         }};
364
365     }
366
367
368     private void sleep(long period){
369         Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
370     }
371
372     public static class MyJournal extends AsyncWriteJournal {
373
374         private static Map<Long, Object> journal = Maps.newTreeMap();
375
376         public static void addToJournal(Long sequenceNr, Object value){
377             journal.put(sequenceNr, value);
378         }
379
380         public static Map<Long, Object> get(){
381             return journal;
382         }
383
384         public static void clear(){
385             journal.clear();
386         }
387
388         @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
389             final Procedure<PersistentRepr> replayCallback) {
390             if(journal.size() == 0){
391                 return Futures.successful(null);
392             }
393             return Futures.future(new Callable<Void>() {
394                 @Override
395                 public Void call() throws Exception {
396                     for (Map.Entry<Long, Object> entry : journal.entrySet()) {
397                         PersistentRepr persistentMessage =
398                             new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
399                                 false, null, null);
400                         replayCallback.apply(persistentMessage);
401                     }
402                     return null;
403                 }
404             }, context().dispatcher());
405         }
406
407         @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
408             return Futures.successful(-1L);
409         }
410
411         @Override public Future<Void> doAsyncWriteMessages(
412             final Iterable<PersistentRepr> persistentReprs) {
413             return Futures.future(new Callable<Void>() {
414                 @Override
415                 public Void call() throws Exception {
416                     for (PersistentRepr repr : persistentReprs){
417                         if(repr.payload() instanceof ShardManager.SchemaContextModules) {
418                             journal.put(repr.sequenceNr(), repr.payload());
419                         }
420                     }
421                     return null;
422                 }
423             }, context().dispatcher());
424         }
425
426         @Override public Future<Void> doAsyncWriteConfirmations(
427             Iterable<PersistentConfirmation> persistentConfirmations) {
428             return Futures.successful(null);
429         }
430
431         @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
432             boolean b) {
433             clear();
434             return Futures.successful(null);
435         }
436
437         @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
438             clear();
439             return Futures.successful(null);
440         }
441     }
442 }