1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.japi.Creator;
6 import akka.pattern.Patterns;
7 import akka.persistence.RecoveryCompleted;
8 import akka.testkit.JavaTestKit;
9 import akka.testkit.TestActorRef;
10 import akka.util.Timeout;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.collect.Sets;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import org.junit.After;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.DataPersistenceProvider;
18 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
20 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
21 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
22 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
27 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
29 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
30 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
32 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
33 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
39 import java.util.Collection;
40 import java.util.HashSet;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.mockito.Mockito.mock;
50 import static org.mockito.Mockito.when;
52 public class ShardManagerTest extends AbstractActorTest {
53 private static int ID_COUNTER = 1;
55 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
56 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
58 private static ActorRef mockShardActor;
62 InMemoryJournal.clear();
64 if(mockShardActor == null) {
65 String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
66 mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
71 public void tearDown() {
72 InMemoryJournal.clear();
75 private Props newShardMgrProps() {
76 return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
77 DatastoreContext.newBuilder().build());
81 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
82 new JavaTestKit(getSystem()) {{
83 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
85 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
87 shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
89 expectMsgEquals(duration("5 seconds"),
90 new PrimaryNotFound("non-existent").toSerializable());
95 public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
96 new JavaTestKit(getSystem()) {{
97 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
99 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
100 shardManager.tell(new ActorInitialized(), mockShardActor);
102 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
104 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
109 public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
110 new JavaTestKit(getSystem()) {{
111 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
113 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
115 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
120 public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
121 new JavaTestKit(getSystem()) {{
122 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
124 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
126 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
127 // delayed until we send ActorInitialized.
128 Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
129 new Timeout(5, TimeUnit.SECONDS));
131 shardManager.tell(new ActorInitialized(), mockShardActor);
133 Object resp = Await.result(future, duration("5 seconds"));
134 assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
139 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
140 new JavaTestKit(getSystem()) {{
141 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
143 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
145 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
147 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
149 assertEquals("getShardName", "non-existent", notFound.getShardName());
154 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
155 new JavaTestKit(getSystem()) {{
156 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
158 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
159 shardManager.tell(new ActorInitialized(), mockShardActor);
161 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
163 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
165 assertTrue("Found path contains " + found.getPath().path().toString(),
166 found.getPath().path().toString().contains("member-1-shard-default-config"));
171 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
172 new JavaTestKit(getSystem()) {{
173 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
175 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
177 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
182 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
183 new JavaTestKit(getSystem()) {{
184 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
186 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
188 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
189 // delayed until we send ActorInitialized.
190 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
191 new Timeout(5, TimeUnit.SECONDS));
193 shardManager.tell(new ActorInitialized(), mockShardActor);
195 Object resp = Await.result(future, duration("5 seconds"));
196 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
201 public void testOnReceiveMemberUp() throws Exception {
202 new JavaTestKit(getSystem()) {{
203 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
205 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
207 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
209 PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
210 PrimaryFound.SERIALIZABLE_CLASS));
211 String path = found.getPrimaryPath();
212 assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
217 public void testOnReceiveMemberDown() throws Exception {
219 new JavaTestKit(getSystem()) {{
220 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
222 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
224 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
226 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
228 MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
230 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
232 expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
237 public void testOnRecoveryJournalIsCleaned() {
238 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
239 ImmutableSet.of("foo")));
240 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
241 ImmutableSet.of("bar")));
242 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
244 new JavaTestKit(getSystem()) {{
245 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
246 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
248 shardManager.underlyingActor().waitForRecoveryComplete();
249 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
251 // Journal entries up to the last one should've been deleted
252 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
253 synchronized (journal) {
254 assertEquals("Journal size", 1, journal.size());
255 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
261 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
262 final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
263 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
265 new JavaTestKit(getSystem()) {{
266 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
267 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
269 shardManager.underlyingActor().waitForRecoveryComplete();
271 Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
273 assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
278 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
280 new JavaTestKit(getSystem()) {{
281 final TestActorRef<ShardManager> shardManager =
282 TestActorRef.create(getSystem(), newShardMgrProps());
284 assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
286 ModuleIdentifier foo = mock(ModuleIdentifier.class);
287 when(foo.getNamespace()).thenReturn(new URI("foo"));
289 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
290 moduleIdentifierSet.add(foo);
292 SchemaContext schemaContext = mock(SchemaContext.class);
293 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
295 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
297 assertEquals("getKnownModules", Sets.newHashSet("foo"),
298 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
300 ModuleIdentifier bar = mock(ModuleIdentifier.class);
301 when(bar.getNamespace()).thenReturn(new URI("bar"));
303 moduleIdentifierSet.add(bar);
305 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
307 assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
308 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
313 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
315 new JavaTestKit(getSystem()) {{
316 final TestActorRef<ShardManager> shardManager =
317 TestActorRef.create(getSystem(), newShardMgrProps());
319 SchemaContext schemaContext = mock(SchemaContext.class);
320 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
322 ModuleIdentifier foo = mock(ModuleIdentifier.class);
323 when(foo.getNamespace()).thenReturn(new URI("foo"));
325 moduleIdentifierSet.add(foo);
327 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
329 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
331 assertEquals("getKnownModules", Sets.newHashSet("foo"),
332 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
334 //Create a completely different SchemaContext with only the bar module in it
335 //schemaContext = mock(SchemaContext.class);
336 moduleIdentifierSet.clear();
337 ModuleIdentifier bar = mock(ModuleIdentifier.class);
338 when(bar.getNamespace()).thenReturn(new URI("bar"));
340 moduleIdentifierSet.add(bar);
342 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
344 assertEquals("getKnownModules", Sets.newHashSet("foo"),
345 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
351 public void testRecoveryApplicable(){
352 new JavaTestKit(getSystem()) {
354 final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
355 new MockClusterWrapper(),
356 new MockConfiguration(),
357 DatastoreContext.newBuilder().persistent(true).build());
358 final TestActorRef<ShardManager> persistentShardManager =
359 TestActorRef.create(getSystem(), persistentProps);
361 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
363 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
365 final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
366 new MockClusterWrapper(),
367 new MockConfiguration(),
368 DatastoreContext.newBuilder().persistent(false).build());
369 final TestActorRef<ShardManager> nonPersistentShardManager =
370 TestActorRef.create(getSystem(), nonPersistentProps);
372 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
374 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
382 public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
384 final CountDownLatch persistLatch = new CountDownLatch(1);
385 final Creator<ShardManager> creator = new Creator<ShardManager>() {
386 private static final long serialVersionUID = 1L;
388 public ShardManager create() throws Exception {
389 return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
391 protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
392 DataPersistenceProviderMonitor dataPersistenceProviderMonitor
393 = new DataPersistenceProviderMonitor();
394 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
395 return dataPersistenceProviderMonitor;
401 new JavaTestKit(getSystem()) {{
403 final TestActorRef<ShardManager> shardManager =
404 TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
406 ModuleIdentifier foo = mock(ModuleIdentifier.class);
407 when(foo.getNamespace()).thenReturn(new URI("foo"));
409 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
410 moduleIdentifierSet.add(foo);
412 SchemaContext schemaContext = mock(SchemaContext.class);
413 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
415 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
417 assertEquals("Persisted", true,
418 Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
425 private static class TestShardManager extends ShardManager {
426 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
428 TestShardManager(String shardMrgIDSuffix) {
429 super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
430 DatastoreContext.newBuilder().build());
434 public void handleRecover(Object message) throws Exception {
436 super.handleRecover(message);
438 if(message instanceof RecoveryCompleted) {
439 recoveryComplete.countDown();
444 void waitForRecoveryComplete() {
445 assertEquals("Recovery complete", true,
446 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
450 @SuppressWarnings("serial")
451 static class TestShardManagerCreator implements Creator<TestShardManager> {
452 String shardMrgIDSuffix;
454 TestShardManagerCreator(String shardMrgIDSuffix) {
455 this.shardMrgIDSuffix = shardMrgIDSuffix;
459 public TestShardManager create() throws Exception {
460 return new TestShardManager(shardMrgIDSuffix);
465 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
466 private static final long serialVersionUID = 1L;
467 private Creator<ShardManager> delegate;
469 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
470 this.delegate = delegate;
474 public ShardManager create() throws Exception {
475 return delegate.create();