Merge "Startup archetype: remove 'Impl' from config subsystem Module name."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.when;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.japi.Creator;
11 import akka.pattern.Patterns;
12 import akka.persistence.RecoveryCompleted;
13 import akka.testkit.JavaTestKit;
14 import akka.testkit.TestActorRef;
15 import akka.util.Timeout;
16 import com.google.common.collect.ImmutableSet;
17 import com.google.common.collect.Sets;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.net.URI;
20 import java.util.Collection;
21 import java.util.HashSet;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.After;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.opendaylight.controller.cluster.DataPersistenceProvider;
30 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
31 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
32 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
33 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
34 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
36 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
37 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
38 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
39 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
40 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
41 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
42 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
43 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import scala.concurrent.Await;
48 import scala.concurrent.Future;
49
50 public class ShardManagerTest extends AbstractActorTest {
51     private static int ID_COUNTER = 1;
52
53     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
54     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
55
56     private static ActorRef mockShardActor;
57
58     @Before
59     public void setUp() {
60         InMemoryJournal.clear();
61
62         if(mockShardActor == null) {
63             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
64             mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
65         }
66     }
67
68     @After
69     public void tearDown() {
70         InMemoryJournal.clear();
71     }
72
73     private Props newShardMgrProps() {
74
75         DatastoreContext.Builder builder = DatastoreContext.newBuilder();
76         builder.dataStoreType(shardMrgIDSuffix);
77         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
78     }
79
80     @Test
81     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
82         new JavaTestKit(getSystem()) {{
83             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
84
85             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
86
87             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
88
89             expectMsgEquals(duration("5 seconds"),
90                     new PrimaryNotFound("non-existent").toSerializable());
91         }};
92     }
93
94     @Test
95     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
96         new JavaTestKit(getSystem()) {{
97             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
98
99             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
100             shardManager.tell(new ActorInitialized(), mockShardActor);
101
102             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
103
104             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
105         }};
106     }
107
108     @Test
109     public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
110         new JavaTestKit(getSystem()) {{
111             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
112
113             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
114
115             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
116         }};
117     }
118
119     @Test
120     public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
121         new JavaTestKit(getSystem()) {{
122             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
123
124             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
125
126             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
127             // delayed until we send ActorInitialized.
128             Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
129                     new Timeout(5, TimeUnit.SECONDS));
130
131             shardManager.tell(new ActorInitialized(), mockShardActor);
132
133             Object resp = Await.result(future, duration("5 seconds"));
134             assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
135         }};
136     }
137
138     @Test
139     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
140         new JavaTestKit(getSystem()) {{
141             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
142
143             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
144
145             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
146
147             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
148
149             assertEquals("getShardName", "non-existent", notFound.getShardName());
150         }};
151     }
152
153     @Test
154     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
155         new JavaTestKit(getSystem()) {{
156             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
157
158             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
159             shardManager.tell(new ActorInitialized(), mockShardActor);
160
161             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
162
163             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
164
165             assertTrue("Found path contains " + found.getPath().path().toString(),
166                     found.getPath().path().toString().contains("member-1-shard-default-config"));
167         }};
168     }
169
170     @Test
171     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
172         new JavaTestKit(getSystem()) {{
173             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
174
175             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
176
177             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
178         }};
179     }
180
181     @Test
182     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
183         new JavaTestKit(getSystem()) {{
184             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
185
186             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
187
188             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
189             // delayed until we send ActorInitialized.
190             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
191                     new Timeout(5, TimeUnit.SECONDS));
192
193             shardManager.tell(new ActorInitialized(), mockShardActor);
194
195             Object resp = Await.result(future, duration("5 seconds"));
196             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
197         }};
198     }
199
200     @Test
201     public void testOnReceiveMemberUp() throws Exception {
202         new JavaTestKit(getSystem()) {{
203             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
204
205             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
206
207             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
208
209             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
210                     PrimaryFound.SERIALIZABLE_CLASS));
211             String path = found.getPrimaryPath();
212             assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
213         }};
214     }
215
216     @Test
217     public void testOnReceiveMemberDown() throws Exception {
218
219         new JavaTestKit(getSystem()) {{
220             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
221
222             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
223
224             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
225
226             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
227
228             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
229
230             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
231
232             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
233         }};
234     }
235
236     @Test
237     public void testOnRecoveryJournalIsCleaned() {
238         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
239                 ImmutableSet.of("foo")));
240         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
241                 ImmutableSet.of("bar")));
242         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
243
244         new JavaTestKit(getSystem()) {{
245             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
246                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
247
248             shardManager.underlyingActor().waitForRecoveryComplete();
249             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
250
251             // Journal entries up to the last one should've been deleted
252             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
253             synchronized (journal) {
254                 assertEquals("Journal size", 1, journal.size());
255                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
256             }
257         }};
258     }
259
260     @Test
261     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
262         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
263         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
264                 persistedModules));
265         new JavaTestKit(getSystem()) {{
266             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
267                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
268
269             shardManager.underlyingActor().waitForRecoveryComplete();
270
271             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
272
273             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
274         }};
275     }
276
277     @Test
278     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
279             throws Exception {
280         new JavaTestKit(getSystem()) {{
281             final TestActorRef<ShardManager> shardManager =
282                     TestActorRef.create(getSystem(), newShardMgrProps());
283
284             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
285
286             ModuleIdentifier foo = mock(ModuleIdentifier.class);
287             when(foo.getNamespace()).thenReturn(new URI("foo"));
288
289             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
290             moduleIdentifierSet.add(foo);
291
292             SchemaContext schemaContext = mock(SchemaContext.class);
293             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
294
295             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
296
297             assertEquals("getKnownModules", Sets.newHashSet("foo"),
298                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
299
300             ModuleIdentifier bar = mock(ModuleIdentifier.class);
301             when(bar.getNamespace()).thenReturn(new URI("bar"));
302
303             moduleIdentifierSet.add(bar);
304
305             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
306
307             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
308                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
309         }};
310     }
311
312     @Test
313     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
314             throws Exception {
315         new JavaTestKit(getSystem()) {{
316             final TestActorRef<ShardManager> shardManager =
317                     TestActorRef.create(getSystem(), newShardMgrProps());
318
319             SchemaContext schemaContext = mock(SchemaContext.class);
320             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
321
322             ModuleIdentifier foo = mock(ModuleIdentifier.class);
323             when(foo.getNamespace()).thenReturn(new URI("foo"));
324
325             moduleIdentifierSet.add(foo);
326
327             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
328
329             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
330
331             assertEquals("getKnownModules", Sets.newHashSet("foo"),
332                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
333
334             //Create a completely different SchemaContext with only the bar module in it
335             //schemaContext = mock(SchemaContext.class);
336             moduleIdentifierSet.clear();
337             ModuleIdentifier bar = mock(ModuleIdentifier.class);
338             when(bar.getNamespace()).thenReturn(new URI("bar"));
339
340             moduleIdentifierSet.add(bar);
341
342             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
343
344             assertEquals("getKnownModules", Sets.newHashSet("foo"),
345                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
346
347         }};
348     }
349
350     @Test
351     public void testRecoveryApplicable(){
352         new JavaTestKit(getSystem()) {
353             {
354                 final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
355                         DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
356                 final TestActorRef<ShardManager> persistentShardManager =
357                         TestActorRef.create(getSystem(), persistentProps);
358
359                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
360
361                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
362
363                 final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
364                         DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
365                 final TestActorRef<ShardManager> nonPersistentShardManager =
366                         TestActorRef.create(getSystem(), nonPersistentProps);
367
368                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
369
370                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
371
372
373             }};
374
375     }
376
377     @Test
378     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
379             throws Exception {
380         final CountDownLatch persistLatch = new CountDownLatch(1);
381         final Creator<ShardManager> creator = new Creator<ShardManager>() {
382             private static final long serialVersionUID = 1L;
383             @Override
384             public ShardManager create() throws Exception {
385                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
386                         DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
387                     @Override
388                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
389                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
390                                 = new DataPersistenceProviderMonitor();
391                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
392                         return dataPersistenceProviderMonitor;
393                     }
394                 };
395             }
396         };
397
398         new JavaTestKit(getSystem()) {{
399
400             final TestActorRef<ShardManager> shardManager =
401                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
402
403             ModuleIdentifier foo = mock(ModuleIdentifier.class);
404             when(foo.getNamespace()).thenReturn(new URI("foo"));
405
406             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
407             moduleIdentifierSet.add(foo);
408
409             SchemaContext schemaContext = mock(SchemaContext.class);
410             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
411
412             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
413
414             assertEquals("Persisted", true,
415                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
416
417         }};
418     }
419
420
421
422     private static class TestShardManager extends ShardManager {
423         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
424
425         TestShardManager(String shardMrgIDSuffix) {
426             super(new MockClusterWrapper(), new MockConfiguration(),
427                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
428         }
429
430         @Override
431         public void handleRecover(Object message) throws Exception {
432             try {
433                 super.handleRecover(message);
434             } finally {
435                 if(message instanceof RecoveryCompleted) {
436                     recoveryComplete.countDown();
437                 }
438             }
439         }
440
441         void waitForRecoveryComplete() {
442             assertEquals("Recovery complete", true,
443                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
444         }
445     }
446
447     @SuppressWarnings("serial")
448     static class TestShardManagerCreator implements Creator<TestShardManager> {
449         String shardMrgIDSuffix;
450
451         TestShardManagerCreator(String shardMrgIDSuffix) {
452             this.shardMrgIDSuffix = shardMrgIDSuffix;
453         }
454
455         @Override
456         public TestShardManager create() throws Exception {
457             return new TestShardManager(shardMrgIDSuffix);
458         }
459
460     }
461
462     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
463         private static final long serialVersionUID = 1L;
464         private Creator<ShardManager> delegate;
465
466         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
467             this.delegate = delegate;
468         }
469
470         @Override
471         public ShardManager create() throws Exception {
472             return delegate.create();
473         }
474     }
475 }