1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.pattern.Patterns;
6 import akka.persistence.RecoveryCompleted;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import akka.util.Timeout;
10 import akka.japi.Creator;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.collect.Sets;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import org.junit.After;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
18 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
19 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
20 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
21 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
22 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
24 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
28 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
29 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
30 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
31 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
32 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
37 import java.util.Collection;
38 import java.util.HashSet;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import static org.junit.Assert.assertEquals;
44 import static org.junit.Assert.assertTrue;
45 import static org.mockito.Mockito.mock;
46 import static org.mockito.Mockito.when;
48 public class ShardManagerTest extends AbstractActorTest {
49 private static int ID_COUNTER = 1;
51 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
52 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
54 private static ActorRef mockShardActor;
58 InMemoryJournal.clear();
60 if(mockShardActor == null) {
61 String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
62 mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
67 public void tearDown() {
68 InMemoryJournal.clear();
71 private Props newShardMgrProps() {
72 return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
73 DatastoreContext.newBuilder().build());
77 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
78 new JavaTestKit(getSystem()) {{
79 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
81 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
83 shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
85 expectMsgEquals(duration("5 seconds"),
86 new PrimaryNotFound("non-existent").toSerializable());
91 public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
92 new JavaTestKit(getSystem()) {{
93 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
95 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
96 shardManager.tell(new ActorInitialized(), mockShardActor);
98 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
100 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
105 public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
106 new JavaTestKit(getSystem()) {{
107 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
109 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
111 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
113 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
118 public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
119 new JavaTestKit(getSystem()) {{
120 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
122 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
124 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
125 // delayed until we send ActorInitialized.
126 Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
127 new Timeout(5, TimeUnit.SECONDS));
129 shardManager.tell(new ActorInitialized(), mockShardActor);
131 Object resp = Await.result(future, duration("5 seconds"));
132 assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
137 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
138 new JavaTestKit(getSystem()) {{
139 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
141 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
143 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
145 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
147 assertEquals("getShardName", "non-existent", notFound.getShardName());
152 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
153 new JavaTestKit(getSystem()) {{
154 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
156 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
157 shardManager.tell(new ActorInitialized(), mockShardActor);
159 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
161 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
163 assertTrue("Found path contains " + found.getPath().path().toString(),
164 found.getPath().path().toString().contains("member-1-shard-default-config"));
169 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
170 new JavaTestKit(getSystem()) {{
171 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
173 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
175 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
177 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
182 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
183 new JavaTestKit(getSystem()) {{
184 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
186 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
188 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
189 // delayed until we send ActorInitialized.
190 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
191 new Timeout(5, TimeUnit.SECONDS));
193 shardManager.tell(new ActorInitialized(), mockShardActor);
195 Object resp = Await.result(future, duration("5 seconds"));
196 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
201 public void testOnReceiveMemberUp() throws Exception {
202 new JavaTestKit(getSystem()) {{
203 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
205 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
207 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
209 PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
210 PrimaryFound.SERIALIZABLE_CLASS));
211 String path = found.getPrimaryPath();
212 assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
217 public void testOnReceiveMemberDown() throws Exception {
219 new JavaTestKit(getSystem()) {{
220 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
222 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
224 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
226 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
228 MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
230 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
232 expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
237 public void testOnRecoveryJournalIsCleaned() {
238 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
239 ImmutableSet.of("foo")));
240 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
241 ImmutableSet.of("bar")));
242 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
244 new JavaTestKit(getSystem()) {{
245 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
246 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
248 shardManager.underlyingActor().waitForRecoveryComplete();
249 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
251 // Journal entries up to the last one should've been deleted
252 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
253 synchronized (journal) {
254 assertEquals("Journal size", 1, journal.size());
255 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
261 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
262 final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
263 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
265 new JavaTestKit(getSystem()) {{
266 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
267 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
269 shardManager.underlyingActor().waitForRecoveryComplete();
271 Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
273 assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
278 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
280 new JavaTestKit(getSystem()) {{
281 final TestActorRef<ShardManager> shardManager =
282 TestActorRef.create(getSystem(), newShardMgrProps());
284 assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
286 ModuleIdentifier foo = mock(ModuleIdentifier.class);
287 when(foo.getNamespace()).thenReturn(new URI("foo"));
289 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
290 moduleIdentifierSet.add(foo);
292 SchemaContext schemaContext = mock(SchemaContext.class);
293 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
295 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
297 assertEquals("getKnownModules", Sets.newHashSet("foo"),
298 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
300 ModuleIdentifier bar = mock(ModuleIdentifier.class);
301 when(bar.getNamespace()).thenReturn(new URI("bar"));
303 moduleIdentifierSet.add(bar);
305 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
307 assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
308 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
313 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
315 new JavaTestKit(getSystem()) {{
316 final TestActorRef<ShardManager> shardManager =
317 TestActorRef.create(getSystem(), newShardMgrProps());
319 SchemaContext schemaContext = mock(SchemaContext.class);
320 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
322 ModuleIdentifier foo = mock(ModuleIdentifier.class);
323 when(foo.getNamespace()).thenReturn(new URI("foo"));
325 moduleIdentifierSet.add(foo);
327 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
329 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
331 assertEquals("getKnownModules", Sets.newHashSet("foo"),
332 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
334 //Create a completely different SchemaContext with only the bar module in it
335 //schemaContext = mock(SchemaContext.class);
336 moduleIdentifierSet.clear();
337 ModuleIdentifier bar = mock(ModuleIdentifier.class);
338 when(bar.getNamespace()).thenReturn(new URI("bar"));
340 moduleIdentifierSet.add(bar);
342 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
344 assertEquals("getKnownModules", Sets.newHashSet("foo"),
345 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
351 private static class TestShardManager extends ShardManager {
352 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
354 TestShardManager(String shardMrgIDSuffix) {
355 super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
356 DatastoreContext.newBuilder().build());
360 public void handleRecover(Object message) throws Exception {
362 super.handleRecover(message);
364 if(message instanceof RecoveryCompleted) {
365 recoveryComplete.countDown();
370 void waitForRecoveryComplete() {
371 assertEquals("Recovery complete", true,
372 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
376 @SuppressWarnings("serial")
377 static class TestShardManagerCreator implements Creator<TestShardManager> {
378 String shardMrgIDSuffix;
380 TestShardManagerCreator(String shardMrgIDSuffix) {
381 this.shardMrgIDSuffix = shardMrgIDSuffix;
385 public TestShardManager create() throws Exception {
386 return new TestShardManager(shardMrgIDSuffix);