Merge "Bug 865: Fixed use of removed deprecated YANGInstanceIdentifier methods."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
7 import akka.japi.Procedure;
8 import akka.persistence.PersistentConfirmation;
9 import akka.persistence.PersistentId;
10 import akka.persistence.PersistentImpl;
11 import akka.persistence.PersistentRepr;
12 import akka.persistence.journal.japi.AsyncWriteJournal;
13 import akka.testkit.JavaTestKit;
14 import akka.testkit.TestActorRef;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import com.typesafe.config.Config;
19 import com.typesafe.config.ConfigFactory;
20 import com.typesafe.config.ConfigValueFactory;
21 import org.junit.AfterClass;
22 import org.junit.Before;
23 import org.junit.BeforeClass;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
27 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
28 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
29 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
30 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
32 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
33 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
34 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
35 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
36 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
37 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
38 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import scala.concurrent.Future;
41
42 import java.net.URI;
43 import java.util.Collection;
44 import java.util.HashMap;
45 import java.util.HashSet;
46 import java.util.Map;
47 import java.util.Set;
48 import java.util.concurrent.Callable;
49 import java.util.concurrent.TimeUnit;
50
51 import static junit.framework.Assert.assertEquals;
52 import static org.junit.Assert.assertFalse;
53 import static org.junit.Assert.assertTrue;
54 import static org.mockito.Mockito.mock;
55 import static org.mockito.Mockito.when;
56
57 public class ShardManagerTest {
58     private static ActorSystem system;
59     Configuration mockConfig = new MockConfiguration();
60     private static ActorRef defaultShardMockActor;
61
62     @BeforeClass
63     public static void setUpClass() {
64         Map<String, String> myJournal = new HashMap<>();
65         myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
66         myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
67         Config config = ConfigFactory.load()
68                 .withValue("akka.persistence.journal.plugin",
69                         ConfigValueFactory.fromAnyRef("my-journal"))
70                 .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
71
72         MyJournal.clear();
73
74         system = ActorSystem.create("test", config);
75
76         String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
77         defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name);
78
79
80     }
81
82     @AfterClass
83     public static void tearDown() {
84         JavaTestKit.shutdownActorSystem(system);
85         system = null;
86     }
87
88     @Before
89     public void setUpTest(){
90         MyJournal.clear();
91     }
92
93     @Test
94     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
95
96         new JavaTestKit(system) {
97             {
98                 final Props props = ShardManager
99                         .props("config", new MockClusterWrapper(),
100                                 new MockConfiguration(), DatastoreContext.newBuilder().build());
101
102                 final ActorRef subject = getSystem().actorOf(props);
103
104                 subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
105
106                 expectMsgEquals(duration("2 seconds"),
107                         new PrimaryNotFound("inventory").toSerializable());
108             }};
109     }
110
111     @Test
112     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
113
114         new JavaTestKit(system) {{
115             final Props props = ShardManager
116                     .props("config", new MockClusterWrapper(),
117                             new MockConfiguration(), DatastoreContext.newBuilder().build());
118
119             final ActorRef subject = getSystem().actorOf(props);
120
121             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
122             subject.tell(new ActorInitialized(), defaultShardMockActor);
123
124             subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
125
126             expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
127         }
128         };
129     }
130
131     @Test
132     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
133
134         new JavaTestKit(system) {{
135             final Props props = ShardManager
136                     .props("config", new MockClusterWrapper(),
137                             new MockConfiguration(), DatastoreContext.newBuilder().build());
138
139             final ActorRef subject = getSystem().actorOf(props);
140
141             subject.tell(new FindLocalShard("inventory"), getRef());
142
143             final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
144                 @Override
145                 protected String match(Object in) {
146                     if (in instanceof LocalShardNotFound) {
147                         return ((LocalShardNotFound) in).getShardName();
148                     } else {
149                         throw noMatch();
150                     }
151                 }
152             }.get(); // this extracts the received message
153
154             assertEquals("inventory", out);
155         }};
156     }
157
158     @Test
159     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
160
161         final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
162
163         new JavaTestKit(system) {{
164             final Props props = ShardManager
165                     .props("config", mockClusterWrapper,
166                             new MockConfiguration(), DatastoreContext.newBuilder().build());
167
168             final ActorRef subject = getSystem().actorOf(props);
169
170             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
171             subject.tell(new ActorInitialized(), defaultShardMockActor);
172
173             subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
174
175             final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
176                 @Override
177                 protected ActorRef match(Object in) {
178                     if (in instanceof LocalShardFound) {
179                         return ((LocalShardFound) in).getPath();
180                     } else {
181                         throw noMatch();
182                     }
183                 }
184             }.get(); // this extracts the received message
185
186             assertTrue(out.path().toString(),
187                     out.path().toString().contains("member-1-shard-default-config"));
188         }};
189     }
190
191     @Test
192     public void testOnReceiveMemberUp() throws Exception {
193
194         new JavaTestKit(system) {{
195             final Props props = ShardManager
196                     .props("config", new MockClusterWrapper(),
197                             new MockConfiguration(), DatastoreContext.newBuilder().build());
198
199             final ActorRef subject = getSystem().actorOf(props);
200
201             MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
202
203             subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
204
205             final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
206                 // do not put code outside this method, will run afterwards
207                 @Override
208                 protected String match(Object in) {
209                     if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
210                         PrimaryFound f = PrimaryFound.fromSerializable(in);
211                         return f.getPrimaryPath();
212                     } else {
213                         throw noMatch();
214                     }
215                 }
216             }.get(); // this extracts the received message
217
218             assertTrue(out, out.contains("member-2-shard-astronauts-config"));
219         }};
220     }
221
222     @Test
223     public void testOnReceiveMemberDown() throws Exception {
224
225         new JavaTestKit(system) {{
226             final Props props = ShardManager
227                     .props("config", new MockClusterWrapper(),
228                             new MockConfiguration(), DatastoreContext.newBuilder().build());
229
230             final ActorRef subject = getSystem().actorOf(props);
231
232             MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
233
234             subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
235
236             expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
237
238             MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
239
240             subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
241
242             expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
243         }};
244     }
245
246     @Test
247     public void testOnRecoveryJournalIsEmptied(){
248         MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
249                 ImmutableSet.of("foo")));
250
251         assertEquals(1, MyJournal.get().size());
252
253         new JavaTestKit(system) {{
254             final Props props = ShardManager
255                     .props("config", new MockClusterWrapper(),
256                             new MockConfiguration(), DatastoreContext.newBuilder().build());
257
258             final ActorRef subject = getSystem().actorOf(props);
259
260             // Send message to check that ShardManager is ready
261             subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
262
263             expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
264
265             assertEquals(0, MyJournal.get().size());
266         }};
267     }
268
269     @Test
270     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
271         new JavaTestKit(system) {{
272             final Props props = ShardManager
273                     .props("config", new MockClusterWrapper(),
274                             new MockConfiguration(), DatastoreContext.newBuilder().build());
275             final TestActorRef<ShardManager> subject =
276                     TestActorRef.create(system, props);
277
278             subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
279
280             Collection<String> knownModules = subject.underlyingActor().getKnownModules();
281
282             assertTrue(knownModules.contains("foo"));
283         }};
284     }
285
286     @Test
287     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
288             throws Exception {
289         new JavaTestKit(system) {{
290             final Props props = ShardManager
291                     .props("config", new MockClusterWrapper(),
292                             new MockConfiguration(), DatastoreContext.newBuilder().build());
293             final TestActorRef<ShardManager> subject =
294                     TestActorRef.create(system, props);
295
296             Collection<String> knownModules = subject.underlyingActor().getKnownModules();
297
298             assertEquals(0, knownModules.size());
299
300             SchemaContext schemaContext = mock(SchemaContext.class);
301             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
302
303             ModuleIdentifier foo = mock(ModuleIdentifier.class);
304             when(foo.getNamespace()).thenReturn(new URI("foo"));
305
306             moduleIdentifierSet.add(foo);
307
308             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
309
310             subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
311
312             assertTrue(knownModules.contains("foo"));
313
314             assertEquals(1, knownModules.size());
315
316             ModuleIdentifier bar = mock(ModuleIdentifier.class);
317             when(bar.getNamespace()).thenReturn(new URI("bar"));
318
319             moduleIdentifierSet.add(bar);
320
321             subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
322
323             assertTrue(knownModules.contains("bar"));
324
325             assertEquals(2, knownModules.size());
326
327         }};
328
329     }
330
331
332     @Test
333     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
334             throws Exception {
335         new JavaTestKit(system) {{
336             final Props props = ShardManager
337                     .props("config", new MockClusterWrapper(),
338                             new MockConfiguration(), DatastoreContext.newBuilder().build());
339             final TestActorRef<ShardManager> subject =
340                     TestActorRef.create(system, props);
341
342             Collection<String> knownModules = subject.underlyingActor().getKnownModules();
343
344             assertEquals(0, knownModules.size());
345
346             SchemaContext schemaContext = mock(SchemaContext.class);
347             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
348
349             ModuleIdentifier foo = mock(ModuleIdentifier.class);
350             when(foo.getNamespace()).thenReturn(new URI("foo"));
351
352             moduleIdentifierSet.add(foo);
353
354             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
355
356             subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
357
358             assertTrue(knownModules.contains("foo"));
359
360             assertEquals(1, knownModules.size());
361
362             //Create a completely different SchemaContext with only the bar module in it
363             schemaContext = mock(SchemaContext.class);
364             moduleIdentifierSet = new HashSet<>();
365             ModuleIdentifier bar = mock(ModuleIdentifier.class);
366             when(bar.getNamespace()).thenReturn(new URI("bar"));
367
368             moduleIdentifierSet.add(bar);
369
370             subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
371
372             assertFalse(knownModules.contains("bar"));
373
374             assertEquals(1, knownModules.size());
375
376         }};
377
378     }
379
380
381     private void sleep(long period){
382         Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
383     }
384
385     public static class MyJournal extends AsyncWriteJournal {
386
387         private static Map<Long, Object> journal = Maps.newTreeMap();
388
389         public static void addToJournal(Long sequenceNr, Object value){
390             journal.put(sequenceNr, value);
391         }
392
393         public static Map<Long, Object> get(){
394             return journal;
395         }
396
397         public static void clear(){
398             journal.clear();
399         }
400
401         @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
402                                                             final Procedure<PersistentRepr> replayCallback) {
403             if(journal.size() == 0){
404                 return Futures.successful(null);
405             }
406             return Futures.future(new Callable<Void>() {
407                 @Override
408                 public Void call() throws Exception {
409                     for (Map.Entry<Long, Object> entry : journal.entrySet()) {
410                         PersistentRepr persistentMessage =
411                                 new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
412                                         false, null, null);
413                         replayCallback.apply(persistentMessage);
414                     }
415                     return null;
416                 }
417             }, context().dispatcher());
418         }
419
420         @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
421             return Futures.successful(-1L);
422         }
423
424         @Override public Future<Void> doAsyncWriteMessages(
425                 final Iterable<PersistentRepr> persistentReprs) {
426             return Futures.future(new Callable<Void>() {
427                 @Override
428                 public Void call() throws Exception {
429                     for (PersistentRepr repr : persistentReprs){
430                         if(repr.payload() instanceof ShardManager.SchemaContextModules) {
431                             journal.put(repr.sequenceNr(), repr.payload());
432                         }
433                     }
434                     return null;
435                 }
436             }, context().dispatcher());
437         }
438
439         @Override public Future<Void> doAsyncWriteConfirmations(
440                 Iterable<PersistentConfirmation> persistentConfirmations) {
441             return Futures.successful(null);
442         }
443
444         @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
445                                                             boolean b) {
446             clear();
447             return Futures.successful(null);
448         }
449
450         @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
451             clear();
452             return Futures.successful(null);
453         }
454     }
455 }