Merge "BUG-2195 Enable features test for netconf connector"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.pattern.Patterns;
6 import akka.persistence.RecoveryCompleted;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import akka.util.Timeout;
10 import akka.japi.Creator;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.collect.Sets;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import org.junit.After;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
18 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
19 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
20 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
21 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
22 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
24 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
28 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
29 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
30 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
31 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
32 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
36 import java.net.URI;
37 import java.util.Collection;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import static org.junit.Assert.assertEquals;
44 import static org.junit.Assert.assertTrue;
45 import static org.mockito.Mockito.mock;
46 import static org.mockito.Mockito.when;
47
48 public class ShardManagerTest extends AbstractActorTest {
49     private static int ID_COUNTER = 1;
50
51     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
52     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
53
54     private static ActorRef mockShardActor;
55
56     @Before
57     public void setUp() {
58         InMemoryJournal.clear();
59
60         if(mockShardActor == null) {
61             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
62             mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
63         }
64     }
65
66     @After
67     public void tearDown() {
68         InMemoryJournal.clear();
69     }
70
71     private Props newShardMgrProps() {
72         return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
73                 DatastoreContext.newBuilder().build());
74     }
75
76     @Test
77     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
78         new JavaTestKit(getSystem()) {{
79             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
80
81             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
82
83             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
84
85             expectMsgEquals(duration("5 seconds"),
86                     new PrimaryNotFound("non-existent").toSerializable());
87         }};
88     }
89
90     @Test
91     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
92         new JavaTestKit(getSystem()) {{
93             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
94
95             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
96             shardManager.tell(new ActorInitialized(), mockShardActor);
97
98             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
99
100             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
101         }};
102     }
103
104     @Test
105     public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
106         new JavaTestKit(getSystem()) {{
107             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
108
109             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
110
111             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
112
113             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
114         }};
115     }
116
117     @Test
118     public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
119         new JavaTestKit(getSystem()) {{
120             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
121
122             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
123
124             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
125             // delayed until we send ActorInitialized.
126             Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
127                     new Timeout(5, TimeUnit.SECONDS));
128
129             shardManager.tell(new ActorInitialized(), mockShardActor);
130
131             Object resp = Await.result(future, duration("5 seconds"));
132             assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
133         }};
134     }
135
136     @Test
137     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
138         new JavaTestKit(getSystem()) {{
139             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
140
141             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
142
143             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
144
145             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
146
147             assertEquals("getShardName", "non-existent", notFound.getShardName());
148         }};
149     }
150
151     @Test
152     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
153         new JavaTestKit(getSystem()) {{
154             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
155
156             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
157             shardManager.tell(new ActorInitialized(), mockShardActor);
158
159             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
160
161             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
162
163             assertTrue("Found path contains " + found.getPath().path().toString(),
164                     found.getPath().path().toString().contains("member-1-shard-default-config"));
165         }};
166     }
167
168     @Test
169     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
170         new JavaTestKit(getSystem()) {{
171             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
172
173             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
174
175             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
176
177             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
178         }};
179     }
180
181     @Test
182     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
183         new JavaTestKit(getSystem()) {{
184             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
185
186             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
187
188             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
189             // delayed until we send ActorInitialized.
190             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
191                     new Timeout(5, TimeUnit.SECONDS));
192
193             shardManager.tell(new ActorInitialized(), mockShardActor);
194
195             Object resp = Await.result(future, duration("5 seconds"));
196             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
197         }};
198     }
199
200     @Test
201     public void testOnReceiveMemberUp() throws Exception {
202         new JavaTestKit(getSystem()) {{
203             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
204
205             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
206
207             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
208
209             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
210                     PrimaryFound.SERIALIZABLE_CLASS));
211             String path = found.getPrimaryPath();
212             assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
213         }};
214     }
215
216     @Test
217     public void testOnReceiveMemberDown() throws Exception {
218
219         new JavaTestKit(getSystem()) {{
220             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
221
222             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
223
224             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
225
226             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
227
228             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
229
230             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
231
232             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
233         }};
234     }
235
236     @Test
237     public void testOnRecoveryJournalIsCleaned() {
238         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
239                 ImmutableSet.of("foo")));
240         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
241                 ImmutableSet.of("bar")));
242         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
243
244         new JavaTestKit(getSystem()) {{
245             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
246                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
247
248             shardManager.underlyingActor().waitForRecoveryComplete();
249             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
250
251             // Journal entries up to the last one should've been deleted
252             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
253             synchronized (journal) {
254                 assertEquals("Journal size", 1, journal.size());
255                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
256             }
257         }};
258     }
259
260     @Test
261     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
262         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
263         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
264                 persistedModules));
265         new JavaTestKit(getSystem()) {{
266             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
267                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
268
269             shardManager.underlyingActor().waitForRecoveryComplete();
270
271             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
272
273             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
274         }};
275     }
276
277     @Test
278     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
279             throws Exception {
280         new JavaTestKit(getSystem()) {{
281             final TestActorRef<ShardManager> shardManager =
282                     TestActorRef.create(getSystem(), newShardMgrProps());
283
284             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
285
286             ModuleIdentifier foo = mock(ModuleIdentifier.class);
287             when(foo.getNamespace()).thenReturn(new URI("foo"));
288
289             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
290             moduleIdentifierSet.add(foo);
291
292             SchemaContext schemaContext = mock(SchemaContext.class);
293             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
294
295             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
296
297             assertEquals("getKnownModules", Sets.newHashSet("foo"),
298                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
299
300             ModuleIdentifier bar = mock(ModuleIdentifier.class);
301             when(bar.getNamespace()).thenReturn(new URI("bar"));
302
303             moduleIdentifierSet.add(bar);
304
305             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
306
307             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
308                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
309         }};
310     }
311
312     @Test
313     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
314             throws Exception {
315         new JavaTestKit(getSystem()) {{
316             final TestActorRef<ShardManager> shardManager =
317                     TestActorRef.create(getSystem(), newShardMgrProps());
318
319             SchemaContext schemaContext = mock(SchemaContext.class);
320             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
321
322             ModuleIdentifier foo = mock(ModuleIdentifier.class);
323             when(foo.getNamespace()).thenReturn(new URI("foo"));
324
325             moduleIdentifierSet.add(foo);
326
327             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
328
329             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
330
331             assertEquals("getKnownModules", Sets.newHashSet("foo"),
332                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
333
334             //Create a completely different SchemaContext with only the bar module in it
335             //schemaContext = mock(SchemaContext.class);
336             moduleIdentifierSet.clear();
337             ModuleIdentifier bar = mock(ModuleIdentifier.class);
338             when(bar.getNamespace()).thenReturn(new URI("bar"));
339
340             moduleIdentifierSet.add(bar);
341
342             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
343
344             assertEquals("getKnownModules", Sets.newHashSet("foo"),
345                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
346
347         }};
348     }
349
350
351     private static class TestShardManager extends ShardManager {
352         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
353
354         TestShardManager(String shardMrgIDSuffix) {
355             super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
356                     DatastoreContext.newBuilder().build());
357         }
358
359         @Override
360         public void handleRecover(Object message) throws Exception {
361             try {
362                 super.handleRecover(message);
363             } finally {
364                 if(message instanceof RecoveryCompleted) {
365                     recoveryComplete.countDown();
366                 }
367             }
368         }
369
370         void waitForRecoveryComplete() {
371             assertEquals("Recovery complete", true,
372                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
373         }
374     }
375
376     @SuppressWarnings("serial")
377     static class TestShardManagerCreator implements Creator<TestShardManager> {
378         String shardMrgIDSuffix;
379
380         TestShardManagerCreator(String shardMrgIDSuffix) {
381             this.shardMrgIDSuffix = shardMrgIDSuffix;
382         }
383
384         @Override
385         public TestShardManager create() throws Exception {
386             return new TestShardManager(shardMrgIDSuffix);
387         }
388
389     }
390 }