e70d79c61a884abd7cef597b0bc43d3781b49eea
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.japi.Creator;
6 import akka.pattern.Patterns;
7 import akka.persistence.RecoveryCompleted;
8 import akka.testkit.JavaTestKit;
9 import akka.testkit.TestActorRef;
10 import akka.util.Timeout;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.collect.Sets;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import org.junit.After;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.DataPersistenceProvider;
18 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
20 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
21 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
22 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
27 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
29 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
30 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
32 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
33 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
37
38 import java.net.URI;
39 import java.util.Collection;
40 import java.util.HashSet;
41 import java.util.Map;
42 import java.util.Set;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.mockito.Mockito.mock;
50 import static org.mockito.Mockito.when;
51
52 public class ShardManagerTest extends AbstractActorTest {
53     private static int ID_COUNTER = 1;
54
55     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
56     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
57
58     private static ActorRef mockShardActor;
59
60     @Before
61     public void setUp() {
62         InMemoryJournal.clear();
63
64         if(mockShardActor == null) {
65             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
66             mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
67         }
68     }
69
70     @After
71     public void tearDown() {
72         InMemoryJournal.clear();
73     }
74
75     private Props newShardMgrProps() {
76         return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
77                 DatastoreContext.newBuilder().build());
78     }
79
80     @Test
81     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
82         new JavaTestKit(getSystem()) {{
83             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
84
85             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
86
87             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
88
89             expectMsgEquals(duration("5 seconds"),
90                     new PrimaryNotFound("non-existent").toSerializable());
91         }};
92     }
93
94     @Test
95     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
96         new JavaTestKit(getSystem()) {{
97             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
98
99             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
100             shardManager.tell(new ActorInitialized(), mockShardActor);
101
102             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
103
104             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
105         }};
106     }
107
108     @Test
109     public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
110         new JavaTestKit(getSystem()) {{
111             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
112
113             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
114
115             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
116
117             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
118         }};
119     }
120
121     @Test
122     public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
123         new JavaTestKit(getSystem()) {{
124             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
125
126             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
127
128             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
129             // delayed until we send ActorInitialized.
130             Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
131                     new Timeout(5, TimeUnit.SECONDS));
132
133             shardManager.tell(new ActorInitialized(), mockShardActor);
134
135             Object resp = Await.result(future, duration("5 seconds"));
136             assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
137         }};
138     }
139
140     @Test
141     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
142         new JavaTestKit(getSystem()) {{
143             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
144
145             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
146
147             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
148
149             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
150
151             assertEquals("getShardName", "non-existent", notFound.getShardName());
152         }};
153     }
154
155     @Test
156     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
157         new JavaTestKit(getSystem()) {{
158             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
159
160             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
161             shardManager.tell(new ActorInitialized(), mockShardActor);
162
163             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
164
165             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
166
167             assertTrue("Found path contains " + found.getPath().path().toString(),
168                     found.getPath().path().toString().contains("member-1-shard-default-config"));
169         }};
170     }
171
172     @Test
173     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
174         new JavaTestKit(getSystem()) {{
175             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
176
177             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
178
179             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
180
181             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
182         }};
183     }
184
185     @Test
186     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
187         new JavaTestKit(getSystem()) {{
188             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
189
190             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
191
192             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
193             // delayed until we send ActorInitialized.
194             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
195                     new Timeout(5, TimeUnit.SECONDS));
196
197             shardManager.tell(new ActorInitialized(), mockShardActor);
198
199             Object resp = Await.result(future, duration("5 seconds"));
200             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
201         }};
202     }
203
204     @Test
205     public void testOnReceiveMemberUp() throws Exception {
206         new JavaTestKit(getSystem()) {{
207             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
208
209             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
210
211             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
212
213             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
214                     PrimaryFound.SERIALIZABLE_CLASS));
215             String path = found.getPrimaryPath();
216             assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
217         }};
218     }
219
220     @Test
221     public void testOnReceiveMemberDown() throws Exception {
222
223         new JavaTestKit(getSystem()) {{
224             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
225
226             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
227
228             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
229
230             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
231
232             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
233
234             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
235
236             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
237         }};
238     }
239
240     @Test
241     public void testOnRecoveryJournalIsCleaned() {
242         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
243                 ImmutableSet.of("foo")));
244         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
245                 ImmutableSet.of("bar")));
246         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
247
248         new JavaTestKit(getSystem()) {{
249             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
250                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
251
252             shardManager.underlyingActor().waitForRecoveryComplete();
253             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
254
255             // Journal entries up to the last one should've been deleted
256             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
257             synchronized (journal) {
258                 assertEquals("Journal size", 1, journal.size());
259                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
260             }
261         }};
262     }
263
264     @Test
265     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
266         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
267         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
268                 persistedModules));
269         new JavaTestKit(getSystem()) {{
270             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
271                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
272
273             shardManager.underlyingActor().waitForRecoveryComplete();
274
275             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
276
277             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
278         }};
279     }
280
281     @Test
282     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
283             throws Exception {
284         new JavaTestKit(getSystem()) {{
285             final TestActorRef<ShardManager> shardManager =
286                     TestActorRef.create(getSystem(), newShardMgrProps());
287
288             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
289
290             ModuleIdentifier foo = mock(ModuleIdentifier.class);
291             when(foo.getNamespace()).thenReturn(new URI("foo"));
292
293             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
294             moduleIdentifierSet.add(foo);
295
296             SchemaContext schemaContext = mock(SchemaContext.class);
297             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
298
299             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
300
301             assertEquals("getKnownModules", Sets.newHashSet("foo"),
302                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
303
304             ModuleIdentifier bar = mock(ModuleIdentifier.class);
305             when(bar.getNamespace()).thenReturn(new URI("bar"));
306
307             moduleIdentifierSet.add(bar);
308
309             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
310
311             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
312                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
313         }};
314     }
315
316     @Test
317     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
318             throws Exception {
319         new JavaTestKit(getSystem()) {{
320             final TestActorRef<ShardManager> shardManager =
321                     TestActorRef.create(getSystem(), newShardMgrProps());
322
323             SchemaContext schemaContext = mock(SchemaContext.class);
324             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
325
326             ModuleIdentifier foo = mock(ModuleIdentifier.class);
327             when(foo.getNamespace()).thenReturn(new URI("foo"));
328
329             moduleIdentifierSet.add(foo);
330
331             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
332
333             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
334
335             assertEquals("getKnownModules", Sets.newHashSet("foo"),
336                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
337
338             //Create a completely different SchemaContext with only the bar module in it
339             //schemaContext = mock(SchemaContext.class);
340             moduleIdentifierSet.clear();
341             ModuleIdentifier bar = mock(ModuleIdentifier.class);
342             when(bar.getNamespace()).thenReturn(new URI("bar"));
343
344             moduleIdentifierSet.add(bar);
345
346             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
347
348             assertEquals("getKnownModules", Sets.newHashSet("foo"),
349                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
350
351         }};
352     }
353
354     @Test
355     public void testRecoveryApplicable(){
356         new JavaTestKit(getSystem()) {
357             {
358                 final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
359                         new MockClusterWrapper(),
360                         new MockConfiguration(),
361                         DatastoreContext.newBuilder().persistent(true).build());
362                 final TestActorRef<ShardManager> persistentShardManager =
363                         TestActorRef.create(getSystem(), persistentProps);
364
365                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
366
367                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
368
369                 final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
370                         new MockClusterWrapper(),
371                         new MockConfiguration(),
372                         DatastoreContext.newBuilder().persistent(false).build());
373                 final TestActorRef<ShardManager> nonPersistentShardManager =
374                         TestActorRef.create(getSystem(), nonPersistentProps);
375
376                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
377
378                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
379
380
381             }};
382
383     }
384
385     @Test
386     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
387             throws Exception {
388         final CountDownLatch persistLatch = new CountDownLatch(1);
389         final Creator<ShardManager> creator = new Creator<ShardManager>() {
390             private static final long serialVersionUID = 1L;
391             @Override
392             public ShardManager create() throws Exception {
393                 return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
394                     @Override
395                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
396                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
397                                 = new DataPersistenceProviderMonitor();
398                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
399                         return dataPersistenceProviderMonitor;
400                     }
401                 };
402             }
403         };
404
405         new JavaTestKit(getSystem()) {{
406
407             final TestActorRef<ShardManager> shardManager =
408                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
409
410             ModuleIdentifier foo = mock(ModuleIdentifier.class);
411             when(foo.getNamespace()).thenReturn(new URI("foo"));
412
413             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
414             moduleIdentifierSet.add(foo);
415
416             SchemaContext schemaContext = mock(SchemaContext.class);
417             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
418
419             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
420
421             assertEquals("Persisted", true,
422                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
423
424         }};
425     }
426
427
428
429     private static class TestShardManager extends ShardManager {
430         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
431
432         TestShardManager(String shardMrgIDSuffix) {
433             super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
434                     DatastoreContext.newBuilder().build());
435         }
436
437         @Override
438         public void handleRecover(Object message) throws Exception {
439             try {
440                 super.handleRecover(message);
441             } finally {
442                 if(message instanceof RecoveryCompleted) {
443                     recoveryComplete.countDown();
444                 }
445             }
446         }
447
448         void waitForRecoveryComplete() {
449             assertEquals("Recovery complete", true,
450                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
451         }
452     }
453
454     @SuppressWarnings("serial")
455     static class TestShardManagerCreator implements Creator<TestShardManager> {
456         String shardMrgIDSuffix;
457
458         TestShardManagerCreator(String shardMrgIDSuffix) {
459             this.shardMrgIDSuffix = shardMrgIDSuffix;
460         }
461
462         @Override
463         public TestShardManager create() throws Exception {
464             return new TestShardManager(shardMrgIDSuffix);
465         }
466
467     }
468
469     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
470         private static final long serialVersionUID = 1L;
471         private Creator<ShardManager> delegate;
472
473         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
474             this.delegate = delegate;
475         }
476
477         @Override
478         public ShardManager create() throws Exception {
479             return delegate.create();
480         }
481     }
482 }