Merge changes If17aefba,I36d0f5ec
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.never;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import static org.mockito.Mockito.when;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.pattern.Patterns;
15 import akka.persistence.RecoveryCompleted;
16 import akka.testkit.JavaTestKit;
17 import akka.testkit.TestActorRef;
18 import akka.util.Timeout;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.collect.Sets;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import java.net.URI;
23 import java.util.Collection;
24 import java.util.HashSet;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mock;
33 import org.mockito.MockitoAnnotations;
34 import org.opendaylight.controller.cluster.DataPersistenceProvider;
35 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
36 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
37 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
38 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
39 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
40 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
42 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
45 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
46 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
47 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
48 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
49 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
50 import org.opendaylight.controller.cluster.raft.RaftState;
51 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
52 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
53 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54 import scala.concurrent.Await;
55 import scala.concurrent.Future;
56
57 public class ShardManagerTest extends AbstractActorTest {
58     private static int ID_COUNTER = 1;
59
60     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
61     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
62
63     @Mock
64     private static CountDownLatch ready;
65
66     private static ActorRef mockShardActor;
67
68     @Before
69     public void setUp() {
70         MockitoAnnotations.initMocks(this);
71
72         InMemoryJournal.clear();
73
74         if(mockShardActor == null) {
75             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
76             mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
77         }
78     }
79
80     @After
81     public void tearDown() {
82         InMemoryJournal.clear();
83     }
84
85     private Props newShardMgrProps() {
86         DatastoreContext.Builder builder = DatastoreContext.newBuilder();
87         builder.dataStoreType(shardMrgIDSuffix);
88         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
89                 builder.build(), ready);
90     }
91
92     @Test
93     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
94         new JavaTestKit(getSystem()) {{
95             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
96
97             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
98
99             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
100
101             expectMsgEquals(duration("5 seconds"),
102                     new PrimaryNotFound("non-existent").toSerializable());
103         }};
104     }
105
106     @Test
107     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
108         new JavaTestKit(getSystem()) {{
109             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
110
111             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
112             shardManager.tell(new ActorInitialized(), mockShardActor);
113
114             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
115
116             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
117         }};
118     }
119
120     @Test
121     public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
122         new JavaTestKit(getSystem()) {{
123             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
124
125             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
126
127             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
128         }};
129     }
130
131     @Test
132     public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
133         new JavaTestKit(getSystem()) {{
134             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
135
136             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
137
138             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
139             // delayed until we send ActorInitialized.
140             Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
141                     new Timeout(5, TimeUnit.SECONDS));
142
143             shardManager.tell(new ActorInitialized(), mockShardActor);
144
145             Object resp = Await.result(future, duration("5 seconds"));
146             assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
147         }};
148     }
149
150     @Test
151     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
152         new JavaTestKit(getSystem()) {{
153             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
154
155             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
156
157             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
158
159             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
160
161             assertEquals("getShardName", "non-existent", notFound.getShardName());
162         }};
163     }
164
165     @Test
166     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
167         new JavaTestKit(getSystem()) {{
168             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
169
170             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
171             shardManager.tell(new ActorInitialized(), mockShardActor);
172
173             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
174
175             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
176
177             assertTrue("Found path contains " + found.getPath().path().toString(),
178                     found.getPath().path().toString().contains("member-1-shard-default-config"));
179         }};
180     }
181
182     @Test
183     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
184         new JavaTestKit(getSystem()) {{
185             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
186
187             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
188
189             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
190         }};
191     }
192
193     @Test
194     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
195         new JavaTestKit(getSystem()) {{
196             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
197
198             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
199
200             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
201             // delayed until we send ActorInitialized.
202             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
203                     new Timeout(5, TimeUnit.SECONDS));
204
205             shardManager.tell(new ActorInitialized(), mockShardActor);
206
207             Object resp = Await.result(future, duration("5 seconds"));
208             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
209         }};
210     }
211
212     @Test
213     public void testOnReceiveMemberUp() throws Exception {
214         new JavaTestKit(getSystem()) {{
215             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
216
217             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
218
219             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
220
221             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
222                     PrimaryFound.SERIALIZABLE_CLASS));
223             String path = found.getPrimaryPath();
224             assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
225         }};
226     }
227
228     @Test
229     public void testOnReceiveMemberDown() throws Exception {
230
231         new JavaTestKit(getSystem()) {{
232             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
233
234             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
235
236             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
237
238             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
239
240             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
241
242             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
243
244             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
245         }};
246     }
247
248     @Test
249     public void testOnRecoveryJournalIsCleaned() {
250         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
251                 ImmutableSet.of("foo")));
252         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
253                 ImmutableSet.of("bar")));
254         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
255
256         new JavaTestKit(getSystem()) {{
257             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
258                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
259
260             shardManager.underlyingActor().waitForRecoveryComplete();
261             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
262
263             // Journal entries up to the last one should've been deleted
264             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
265             synchronized (journal) {
266                 assertEquals("Journal size", 1, journal.size());
267                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
268             }
269         }};
270     }
271
272     @Test
273     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
274         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
275         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
276                 persistedModules));
277         new JavaTestKit(getSystem()) {{
278             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
279                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
280
281             shardManager.underlyingActor().waitForRecoveryComplete();
282
283             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
284
285             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
286         }};
287     }
288
289     @Test
290     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
291             throws Exception {
292         new JavaTestKit(getSystem()) {{
293             final TestActorRef<ShardManager> shardManager =
294                     TestActorRef.create(getSystem(), newShardMgrProps());
295
296             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
297
298             ModuleIdentifier foo = mock(ModuleIdentifier.class);
299             when(foo.getNamespace()).thenReturn(new URI("foo"));
300
301             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
302             moduleIdentifierSet.add(foo);
303
304             SchemaContext schemaContext = mock(SchemaContext.class);
305             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
306
307             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
308
309             assertEquals("getKnownModules", Sets.newHashSet("foo"),
310                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
311
312             ModuleIdentifier bar = mock(ModuleIdentifier.class);
313             when(bar.getNamespace()).thenReturn(new URI("bar"));
314
315             moduleIdentifierSet.add(bar);
316
317             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
318
319             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
320                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
321         }};
322     }
323
324     @Test
325     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
326             throws Exception {
327         new JavaTestKit(getSystem()) {{
328             final TestActorRef<ShardManager> shardManager =
329                     TestActorRef.create(getSystem(), newShardMgrProps());
330
331             SchemaContext schemaContext = mock(SchemaContext.class);
332             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
333
334             ModuleIdentifier foo = mock(ModuleIdentifier.class);
335             when(foo.getNamespace()).thenReturn(new URI("foo"));
336
337             moduleIdentifierSet.add(foo);
338
339             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
340
341             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
342
343             assertEquals("getKnownModules", Sets.newHashSet("foo"),
344                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
345
346             //Create a completely different SchemaContext with only the bar module in it
347             //schemaContext = mock(SchemaContext.class);
348             moduleIdentifierSet.clear();
349             ModuleIdentifier bar = mock(ModuleIdentifier.class);
350             when(bar.getNamespace()).thenReturn(new URI("bar"));
351
352             moduleIdentifierSet.add(bar);
353
354             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
355
356             assertEquals("getKnownModules", Sets.newHashSet("foo"),
357                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
358
359         }};
360     }
361
362     @Test
363     public void testRecoveryApplicable(){
364         new JavaTestKit(getSystem()) {
365             {
366                 final Props persistentProps = ShardManager.props(
367                         new MockClusterWrapper(),
368                         new MockConfiguration(),
369                         DatastoreContext.newBuilder().persistent(true).build(), ready);
370                 final TestActorRef<ShardManager> persistentShardManager =
371                         TestActorRef.create(getSystem(), persistentProps);
372
373                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
374
375                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
376
377                 final Props nonPersistentProps = ShardManager.props(
378                         new MockClusterWrapper(),
379                         new MockConfiguration(),
380                         DatastoreContext.newBuilder().persistent(false).build(), ready);
381                 final TestActorRef<ShardManager> nonPersistentShardManager =
382                         TestActorRef.create(getSystem(), nonPersistentProps);
383
384                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
385
386                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
387
388
389             }};
390
391     }
392
393     @Test
394     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
395             throws Exception {
396         final CountDownLatch persistLatch = new CountDownLatch(1);
397         final Creator<ShardManager> creator = new Creator<ShardManager>() {
398             private static final long serialVersionUID = 1L;
399             @Override
400             public ShardManager create() throws Exception {
401                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
402                     @Override
403                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
404                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
405                                 = new DataPersistenceProviderMonitor();
406                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
407                         return dataPersistenceProviderMonitor;
408                     }
409                 };
410             }
411         };
412
413         new JavaTestKit(getSystem()) {{
414
415             final TestActorRef<ShardManager> shardManager =
416                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
417
418             ModuleIdentifier foo = mock(ModuleIdentifier.class);
419             when(foo.getNamespace()).thenReturn(new URI("foo"));
420
421             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
422             moduleIdentifierSet.add(foo);
423
424             SchemaContext schemaContext = mock(SchemaContext.class);
425             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
426
427             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
428
429             assertEquals("Persisted", true,
430                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
431
432         }};
433     }
434
435     @Test
436     public void testRoleChangeNotificationReleaseReady() throws Exception {
437         new JavaTestKit(getSystem()) {
438             {
439                 final Props persistentProps = ShardManager.props(
440                         new MockClusterWrapper(),
441                         new MockConfiguration(),
442                         DatastoreContext.newBuilder().persistent(true).build(), ready);
443                 final TestActorRef<ShardManager> shardManager =
444                         TestActorRef.create(getSystem(), persistentProps);
445
446                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
447
448                 verify(ready, times(1)).countDown();
449
450             }};
451     }
452
453     @Test
454     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
455         new JavaTestKit(getSystem()) {
456             {
457                 final Props persistentProps = ShardManager.props(
458                         new MockClusterWrapper(),
459                         new MockConfiguration(),
460                         DatastoreContext.newBuilder().persistent(true).build(), ready);
461                 final TestActorRef<ShardManager> shardManager =
462                         TestActorRef.create(getSystem(), persistentProps);
463
464                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
465
466                 verify(ready, never()).countDown();
467
468             }};
469     }
470
471
472
473     private static class TestShardManager extends ShardManager {
474         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
475
476         TestShardManager(String shardMrgIDSuffix) {
477             super(new MockClusterWrapper(), new MockConfiguration(),
478                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
479         }
480
481         @Override
482         public void handleRecover(Object message) throws Exception {
483             try {
484                 super.handleRecover(message);
485             } finally {
486                 if(message instanceof RecoveryCompleted) {
487                     recoveryComplete.countDown();
488                 }
489             }
490         }
491
492         void waitForRecoveryComplete() {
493             assertEquals("Recovery complete", true,
494                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
495         }
496     }
497
498     @SuppressWarnings("serial")
499     static class TestShardManagerCreator implements Creator<TestShardManager> {
500         String shardMrgIDSuffix;
501
502         TestShardManagerCreator(String shardMrgIDSuffix) {
503             this.shardMrgIDSuffix = shardMrgIDSuffix;
504         }
505
506         @Override
507         public TestShardManager create() throws Exception {
508             return new TestShardManager(shardMrgIDSuffix);
509         }
510
511     }
512
513     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
514         private static final long serialVersionUID = 1L;
515         private Creator<ShardManager> delegate;
516
517         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
518             this.delegate = delegate;
519         }
520
521         @Override
522         public ShardManager create() throws Exception {
523             return delegate.create();
524         }
525     }
526 }