Bug 2135: Create ShardInformation on startup
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.persistence.RecoveryCompleted;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8 import akka.japi.Creator;
9 import com.google.common.collect.ImmutableSet;
10 import com.google.common.collect.Sets;
11 import com.google.common.util.concurrent.Uninterruptibles;
12 import org.junit.After;
13 import org.junit.Before;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
17 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
18 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
19 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
20 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
21 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
23 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
24 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
25 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
26 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
27 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
28 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32 import java.net.URI;
33 import java.util.Collection;
34 import java.util.HashSet;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import static org.junit.Assert.assertEquals;
40 import static org.junit.Assert.assertTrue;
41 import static org.mockito.Mockito.mock;
42 import static org.mockito.Mockito.when;
43
44 public class ShardManagerTest extends AbstractActorTest {
45     private static int ID_COUNTER = 1;
46
47     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
48     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
49
50     private static ActorRef mockShardActor;
51
52     @Before
53     public void setUp() {
54         InMemoryJournal.clear();
55
56         if(mockShardActor == null) {
57             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
58             mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
59         }
60     }
61
62     @After
63     public void tearDown() {
64         InMemoryJournal.clear();
65     }
66
67     private Props newShardMgrProps() {
68         return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
69                 DatastoreContext.newBuilder().build());
70     }
71
72     @Test
73     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
74         new JavaTestKit(getSystem()) {{
75             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
76
77             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
78
79             shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef());
80
81             expectMsgEquals(duration("5 seconds"),
82                     new PrimaryNotFound("non-existent").toSerializable());
83         }};
84     }
85
86     @Test
87     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
88         new JavaTestKit(getSystem()) {{
89             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
90
91             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
92             shardManager.tell(new ActorInitialized(), mockShardActor);
93
94             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
95
96             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
97         }};
98     }
99
100     @Test
101     public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception {
102         new JavaTestKit(getSystem()) {{
103             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
104
105             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
106
107             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
108
109             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
110         }};
111     }
112
113     @Test
114     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
115         new JavaTestKit(getSystem()) {{
116             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
117
118             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
119
120             shardManager.tell(new FindLocalShard("non-existent"), getRef());
121
122             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
123
124             assertEquals("getShardName", "non-existent", notFound.getShardName());
125         }};
126     }
127
128     @Test
129     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
130         new JavaTestKit(getSystem()) {{
131             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
132
133             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
134             shardManager.tell(new ActorInitialized(), mockShardActor);
135
136             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
137
138             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
139
140             assertTrue("Found path contains " + found.getPath().path().toString(),
141                     found.getPath().path().toString().contains("member-1-shard-default-config"));
142         }};
143     }
144
145     @Test
146     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
147         new JavaTestKit(getSystem()) {{
148             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
149
150             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
151             //shardManager.tell(new ActorInitialized(), mockShardActor);
152
153             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
154
155             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
156         }};
157     }
158
159     @Test
160     public void testOnReceiveMemberUp() throws Exception {
161         new JavaTestKit(getSystem()) {{
162             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
163
164             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
165
166             shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
167
168             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
169                     PrimaryFound.SERIALIZABLE_CLASS));
170             String path = found.getPrimaryPath();
171             assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
172         }};
173     }
174
175     @Test
176     public void testOnReceiveMemberDown() throws Exception {
177
178         new JavaTestKit(getSystem()) {{
179             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
180
181             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
182
183             shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
184
185             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
186
187             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
188
189             shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
190
191             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
192         }};
193     }
194
195     @Test
196     public void testOnRecoveryJournalIsCleaned() {
197         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
198                 ImmutableSet.of("foo")));
199         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
200                 ImmutableSet.of("bar")));
201         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
202
203         new JavaTestKit(getSystem()) {{
204             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
205                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
206
207             shardManager.underlyingActor().waitForRecoveryComplete();
208             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
209
210             // Journal entries up to the last one should've been deleted
211             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
212             synchronized (journal) {
213                 assertEquals("Journal size", 1, journal.size());
214                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
215             }
216         }};
217     }
218
219     @Test
220     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
221         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
222         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
223                 persistedModules));
224         new JavaTestKit(getSystem()) {{
225             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
226                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
227
228             shardManager.underlyingActor().waitForRecoveryComplete();
229
230             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
231
232             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
233         }};
234     }
235
236     @Test
237     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
238             throws Exception {
239         new JavaTestKit(getSystem()) {{
240             final TestActorRef<ShardManager> shardManager =
241                     TestActorRef.create(getSystem(), newShardMgrProps());
242
243             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
244
245             ModuleIdentifier foo = mock(ModuleIdentifier.class);
246             when(foo.getNamespace()).thenReturn(new URI("foo"));
247
248             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
249             moduleIdentifierSet.add(foo);
250
251             SchemaContext schemaContext = mock(SchemaContext.class);
252             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
253
254             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
255
256             assertEquals("getKnownModules", Sets.newHashSet("foo"),
257                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
258
259             ModuleIdentifier bar = mock(ModuleIdentifier.class);
260             when(bar.getNamespace()).thenReturn(new URI("bar"));
261
262             moduleIdentifierSet.add(bar);
263
264             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
265
266             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
267                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
268         }};
269     }
270
271     @Test
272     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
273             throws Exception {
274         new JavaTestKit(getSystem()) {{
275             final TestActorRef<ShardManager> shardManager =
276                     TestActorRef.create(getSystem(), newShardMgrProps());
277
278             SchemaContext schemaContext = mock(SchemaContext.class);
279             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
280
281             ModuleIdentifier foo = mock(ModuleIdentifier.class);
282             when(foo.getNamespace()).thenReturn(new URI("foo"));
283
284             moduleIdentifierSet.add(foo);
285
286             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
287
288             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
289
290             assertEquals("getKnownModules", Sets.newHashSet("foo"),
291                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
292
293             //Create a completely different SchemaContext with only the bar module in it
294             //schemaContext = mock(SchemaContext.class);
295             moduleIdentifierSet.clear();
296             ModuleIdentifier bar = mock(ModuleIdentifier.class);
297             when(bar.getNamespace()).thenReturn(new URI("bar"));
298
299             moduleIdentifierSet.add(bar);
300
301             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
302
303             assertEquals("getKnownModules", Sets.newHashSet("foo"),
304                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
305
306         }};
307     }
308
309
310     private static class TestShardManager extends ShardManager {
311         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
312
313         TestShardManager(String shardMrgIDSuffix) {
314             super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
315                     DatastoreContext.newBuilder().build());
316         }
317
318         @Override
319         public void handleRecover(Object message) throws Exception {
320             try {
321                 super.handleRecover(message);
322             } finally {
323                 if(message instanceof RecoveryCompleted) {
324                     recoveryComplete.countDown();
325                 }
326             }
327         }
328
329         void waitForRecoveryComplete() {
330             assertEquals("Recovery complete", true,
331                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
332         }
333     }
334
335     @SuppressWarnings("serial")
336     static class TestShardManagerCreator implements Creator<TestShardManager> {
337         String shardMrgIDSuffix;
338
339         TestShardManagerCreator(String shardMrgIDSuffix) {
340             this.shardMrgIDSuffix = shardMrgIDSuffix;
341         }
342
343         @Override
344         public TestShardManager create() throws Exception {
345             return new TestShardManager(shardMrgIDSuffix);
346         }
347
348     }
349 }