Do not duplicate OSGi dependencyManagement
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.japi.Creator;
6 import akka.pattern.Patterns;
7 import akka.persistence.RecoveryCompleted;
8 import akka.testkit.JavaTestKit;
9 import akka.testkit.TestActorRef;
10 import akka.util.Timeout;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.collect.Sets;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import org.junit.After;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.DataPersistenceProvider;
18 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
20 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
21 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
22 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
27 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
29 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
30 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
32 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
33 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
37
38 import java.net.URI;
39 import java.util.Collection;
40 import java.util.HashSet;
41 import java.util.Map;
42 import java.util.Set;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.mockito.Mockito.mock;
50 import static org.mockito.Mockito.when;
51
52 public class ShardManagerTest extends AbstractActorTest {
53     private static int ID_COUNTER = 1;
54
55     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
56     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
57
58     private static ActorRef mockShardActor;
59
60     @Before
61     public void setUp() {
62         InMemoryJournal.clear();
63
64         if(mockShardActor == null) {
65             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
66             mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
67         }
68     }
69
70     @After
71     public void tearDown() {
72         InMemoryJournal.clear();
73     }
74
75     private Props newShardMgrProps() {
76         return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
77                 DatastoreContext.newBuilder().build());
78     }
79
80     @Test
81     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
82         new JavaTestKit(getSystem()) {{
83             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
84
85             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
86
87             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
88
89             expectMsgEquals(duration("5 seconds"),
90                     new PrimaryNotFound("non-existent").toSerializable());
91         }};
92     }
93
94     @Test
95     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
96         new JavaTestKit(getSystem()) {{
97             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
98
99             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
100             shardManager.tell(new ActorInitialized(), mockShardActor);
101
102             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
103
104             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
105         }};
106     }
107
108     @Test
109     public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
110         new JavaTestKit(getSystem()) {{
111             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
112
113             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
114
115             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
116         }};
117     }
118
119     @Test
120     public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
121         new JavaTestKit(getSystem()) {{
122             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
123
124             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
125
126             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
127             // delayed until we send ActorInitialized.
128             Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
129                     new Timeout(5, TimeUnit.SECONDS));
130
131             shardManager.tell(new ActorInitialized(), mockShardActor);
132
133             Object resp = Await.result(future, duration("5 seconds"));
134             assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
135         }};
136     }
137
138     @Test
139     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
140         new JavaTestKit(getSystem()) {{
141             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
142
143             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
144
145             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
146
147             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
148
149             assertEquals("getShardName", "non-existent", notFound.getShardName());
150         }};
151     }
152
153     @Test
154     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
155         new JavaTestKit(getSystem()) {{
156             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
157
158             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
159             shardManager.tell(new ActorInitialized(), mockShardActor);
160
161             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
162
163             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
164
165             assertTrue("Found path contains " + found.getPath().path().toString(),
166                     found.getPath().path().toString().contains("member-1-shard-default-config"));
167         }};
168     }
169
170     @Test
171     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
172         new JavaTestKit(getSystem()) {{
173             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
174
175             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
176
177             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
178         }};
179     }
180
181     @Test
182     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
183         new JavaTestKit(getSystem()) {{
184             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
185
186             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
187
188             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
189             // delayed until we send ActorInitialized.
190             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
191                     new Timeout(5, TimeUnit.SECONDS));
192
193             shardManager.tell(new ActorInitialized(), mockShardActor);
194
195             Object resp = Await.result(future, duration("5 seconds"));
196             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
197         }};
198     }
199
200     @Test
201     public void testOnReceiveMemberUp() throws Exception {
202         new JavaTestKit(getSystem()) {{
203             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
204
205             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
206
207             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
208
209             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
210                     PrimaryFound.SERIALIZABLE_CLASS));
211             String path = found.getPrimaryPath();
212             assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
213         }};
214     }
215
216     @Test
217     public void testOnReceiveMemberDown() throws Exception {
218
219         new JavaTestKit(getSystem()) {{
220             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
221
222             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
223
224             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
225
226             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
227
228             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
229
230             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
231
232             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
233         }};
234     }
235
236     @Test
237     public void testOnRecoveryJournalIsCleaned() {
238         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
239                 ImmutableSet.of("foo")));
240         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
241                 ImmutableSet.of("bar")));
242         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
243
244         new JavaTestKit(getSystem()) {{
245             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
246                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
247
248             shardManager.underlyingActor().waitForRecoveryComplete();
249             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
250
251             // Journal entries up to the last one should've been deleted
252             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
253             synchronized (journal) {
254                 assertEquals("Journal size", 1, journal.size());
255                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
256             }
257         }};
258     }
259
260     @Test
261     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
262         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
263         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
264                 persistedModules));
265         new JavaTestKit(getSystem()) {{
266             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
267                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
268
269             shardManager.underlyingActor().waitForRecoveryComplete();
270
271             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
272
273             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
274         }};
275     }
276
277     @Test
278     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
279             throws Exception {
280         new JavaTestKit(getSystem()) {{
281             final TestActorRef<ShardManager> shardManager =
282                     TestActorRef.create(getSystem(), newShardMgrProps());
283
284             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
285
286             ModuleIdentifier foo = mock(ModuleIdentifier.class);
287             when(foo.getNamespace()).thenReturn(new URI("foo"));
288
289             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
290             moduleIdentifierSet.add(foo);
291
292             SchemaContext schemaContext = mock(SchemaContext.class);
293             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
294
295             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
296
297             assertEquals("getKnownModules", Sets.newHashSet("foo"),
298                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
299
300             ModuleIdentifier bar = mock(ModuleIdentifier.class);
301             when(bar.getNamespace()).thenReturn(new URI("bar"));
302
303             moduleIdentifierSet.add(bar);
304
305             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
306
307             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
308                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
309         }};
310     }
311
312     @Test
313     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
314             throws Exception {
315         new JavaTestKit(getSystem()) {{
316             final TestActorRef<ShardManager> shardManager =
317                     TestActorRef.create(getSystem(), newShardMgrProps());
318
319             SchemaContext schemaContext = mock(SchemaContext.class);
320             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
321
322             ModuleIdentifier foo = mock(ModuleIdentifier.class);
323             when(foo.getNamespace()).thenReturn(new URI("foo"));
324
325             moduleIdentifierSet.add(foo);
326
327             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
328
329             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
330
331             assertEquals("getKnownModules", Sets.newHashSet("foo"),
332                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
333
334             //Create a completely different SchemaContext with only the bar module in it
335             //schemaContext = mock(SchemaContext.class);
336             moduleIdentifierSet.clear();
337             ModuleIdentifier bar = mock(ModuleIdentifier.class);
338             when(bar.getNamespace()).thenReturn(new URI("bar"));
339
340             moduleIdentifierSet.add(bar);
341
342             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
343
344             assertEquals("getKnownModules", Sets.newHashSet("foo"),
345                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
346
347         }};
348     }
349
350     @Test
351     public void testRecoveryApplicable(){
352         new JavaTestKit(getSystem()) {
353             {
354                 final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
355                         new MockClusterWrapper(),
356                         new MockConfiguration(),
357                         DatastoreContext.newBuilder().persistent(true).build());
358                 final TestActorRef<ShardManager> persistentShardManager =
359                         TestActorRef.create(getSystem(), persistentProps);
360
361                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
362
363                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
364
365                 final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
366                         new MockClusterWrapper(),
367                         new MockConfiguration(),
368                         DatastoreContext.newBuilder().persistent(false).build());
369                 final TestActorRef<ShardManager> nonPersistentShardManager =
370                         TestActorRef.create(getSystem(), nonPersistentProps);
371
372                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
373
374                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
375
376
377             }};
378
379     }
380
381     @Test
382     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
383             throws Exception {
384         final CountDownLatch persistLatch = new CountDownLatch(1);
385         final Creator<ShardManager> creator = new Creator<ShardManager>() {
386             private static final long serialVersionUID = 1L;
387             @Override
388             public ShardManager create() throws Exception {
389                 return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
390                     @Override
391                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
392                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
393                                 = new DataPersistenceProviderMonitor();
394                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
395                         return dataPersistenceProviderMonitor;
396                     }
397                 };
398             }
399         };
400
401         new JavaTestKit(getSystem()) {{
402
403             final TestActorRef<ShardManager> shardManager =
404                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
405
406             ModuleIdentifier foo = mock(ModuleIdentifier.class);
407             when(foo.getNamespace()).thenReturn(new URI("foo"));
408
409             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
410             moduleIdentifierSet.add(foo);
411
412             SchemaContext schemaContext = mock(SchemaContext.class);
413             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
414
415             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
416
417             assertEquals("Persisted", true,
418                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
419
420         }};
421     }
422
423
424
425     private static class TestShardManager extends ShardManager {
426         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
427
428         TestShardManager(String shardMrgIDSuffix) {
429             super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
430                     DatastoreContext.newBuilder().build());
431         }
432
433         @Override
434         public void handleRecover(Object message) throws Exception {
435             try {
436                 super.handleRecover(message);
437             } finally {
438                 if(message instanceof RecoveryCompleted) {
439                     recoveryComplete.countDown();
440                 }
441             }
442         }
443
444         void waitForRecoveryComplete() {
445             assertEquals("Recovery complete", true,
446                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
447         }
448     }
449
450     @SuppressWarnings("serial")
451     static class TestShardManagerCreator implements Creator<TestShardManager> {
452         String shardMrgIDSuffix;
453
454         TestShardManagerCreator(String shardMrgIDSuffix) {
455             this.shardMrgIDSuffix = shardMrgIDSuffix;
456         }
457
458         @Override
459         public TestShardManager create() throws Exception {
460             return new TestShardManager(shardMrgIDSuffix);
461         }
462
463     }
464
465     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
466         private static final long serialVersionUID = 1L;
467         private Creator<ShardManager> delegate;
468
469         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
470             this.delegate = delegate;
471         }
472
473         @Override
474         public ShardManager create() throws Exception {
475             return delegate.create();
476         }
477     }
478 }