1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.persistence.RecoveryCompleted;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8 import akka.japi.Creator;
9 import com.google.common.collect.ImmutableSet;
10 import com.google.common.collect.Sets;
11 import com.google.common.util.concurrent.Uninterruptibles;
12 import org.junit.After;
13 import org.junit.Before;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
17 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
18 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
19 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
20 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
21 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
23 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
24 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
25 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
26 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
27 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
28 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import java.util.Collection;
34 import java.util.HashSet;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import static org.junit.Assert.assertEquals;
40 import static org.junit.Assert.assertTrue;
41 import static org.mockito.Mockito.mock;
42 import static org.mockito.Mockito.when;
44 public class ShardManagerTest extends AbstractActorTest {
45 private static int ID_COUNTER = 1;
47 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
48 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
50 private static ActorRef mockShardActor;
54 InMemoryJournal.clear();
56 if(mockShardActor == null) {
57 String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
58 mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
63 public void tearDown() {
64 InMemoryJournal.clear();
67 private Props newShardMgrProps() {
68 return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
69 DatastoreContext.newBuilder().build());
73 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
74 new JavaTestKit(getSystem()) {{
75 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
77 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
79 shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef());
81 expectMsgEquals(duration("5 seconds"),
82 new PrimaryNotFound("non-existent").toSerializable());
87 public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
88 new JavaTestKit(getSystem()) {{
89 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
91 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
92 shardManager.tell(new ActorInitialized(), mockShardActor);
94 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
96 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
101 public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception {
102 new JavaTestKit(getSystem()) {{
103 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
105 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
107 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
109 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
114 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
115 new JavaTestKit(getSystem()) {{
116 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
118 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
120 shardManager.tell(new FindLocalShard("non-existent"), getRef());
122 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
124 assertEquals("getShardName", "non-existent", notFound.getShardName());
129 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
130 new JavaTestKit(getSystem()) {{
131 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
133 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
134 shardManager.tell(new ActorInitialized(), mockShardActor);
136 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
138 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
140 assertTrue("Found path contains " + found.getPath().path().toString(),
141 found.getPath().path().toString().contains("member-1-shard-default-config"));
146 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
147 new JavaTestKit(getSystem()) {{
148 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
150 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
151 //shardManager.tell(new ActorInitialized(), mockShardActor);
153 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
155 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
160 public void testOnReceiveMemberUp() throws Exception {
161 new JavaTestKit(getSystem()) {{
162 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
164 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
166 shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
168 PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
169 PrimaryFound.SERIALIZABLE_CLASS));
170 String path = found.getPrimaryPath();
171 assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
176 public void testOnReceiveMemberDown() throws Exception {
178 new JavaTestKit(getSystem()) {{
179 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
181 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
183 shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
185 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
187 MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
189 shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
191 expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
196 public void testOnRecoveryJournalIsCleaned() {
197 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
198 ImmutableSet.of("foo")));
199 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
200 ImmutableSet.of("bar")));
201 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
203 new JavaTestKit(getSystem()) {{
204 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
205 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
207 shardManager.underlyingActor().waitForRecoveryComplete();
208 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
210 // Journal entries up to the last one should've been deleted
211 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
212 synchronized (journal) {
213 assertEquals("Journal size", 1, journal.size());
214 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
220 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
221 final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
222 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
224 new JavaTestKit(getSystem()) {{
225 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
226 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
228 shardManager.underlyingActor().waitForRecoveryComplete();
230 Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
232 assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
237 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
239 new JavaTestKit(getSystem()) {{
240 final TestActorRef<ShardManager> shardManager =
241 TestActorRef.create(getSystem(), newShardMgrProps());
243 assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
245 ModuleIdentifier foo = mock(ModuleIdentifier.class);
246 when(foo.getNamespace()).thenReturn(new URI("foo"));
248 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
249 moduleIdentifierSet.add(foo);
251 SchemaContext schemaContext = mock(SchemaContext.class);
252 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
254 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
256 assertEquals("getKnownModules", Sets.newHashSet("foo"),
257 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
259 ModuleIdentifier bar = mock(ModuleIdentifier.class);
260 when(bar.getNamespace()).thenReturn(new URI("bar"));
262 moduleIdentifierSet.add(bar);
264 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
266 assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
267 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
272 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
274 new JavaTestKit(getSystem()) {{
275 final TestActorRef<ShardManager> shardManager =
276 TestActorRef.create(getSystem(), newShardMgrProps());
278 SchemaContext schemaContext = mock(SchemaContext.class);
279 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
281 ModuleIdentifier foo = mock(ModuleIdentifier.class);
282 when(foo.getNamespace()).thenReturn(new URI("foo"));
284 moduleIdentifierSet.add(foo);
286 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
288 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
290 assertEquals("getKnownModules", Sets.newHashSet("foo"),
291 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
293 //Create a completely different SchemaContext with only the bar module in it
294 //schemaContext = mock(SchemaContext.class);
295 moduleIdentifierSet.clear();
296 ModuleIdentifier bar = mock(ModuleIdentifier.class);
297 when(bar.getNamespace()).thenReturn(new URI("bar"));
299 moduleIdentifierSet.add(bar);
301 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
303 assertEquals("getKnownModules", Sets.newHashSet("foo"),
304 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
310 private static class TestShardManager extends ShardManager {
311 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
313 TestShardManager(String shardMrgIDSuffix) {
314 super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
315 DatastoreContext.newBuilder().build());
319 public void handleRecover(Object message) throws Exception {
321 super.handleRecover(message);
323 if(message instanceof RecoveryCompleted) {
324 recoveryComplete.countDown();
329 void waitForRecoveryComplete() {
330 assertEquals("Recovery complete", true,
331 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
335 @SuppressWarnings("serial")
336 static class TestShardManagerCreator implements Creator<TestShardManager> {
337 String shardMrgIDSuffix;
339 TestShardManagerCreator(String shardMrgIDSuffix) {
340 this.shardMrgIDSuffix = shardMrgIDSuffix;
344 public TestShardManager create() throws Exception {
345 return new TestShardManager(shardMrgIDSuffix);