1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.never;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import static org.mockito.Mockito.when;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.pattern.Patterns;
15 import akka.persistence.RecoveryCompleted;
16 import akka.testkit.JavaTestKit;
17 import akka.testkit.TestActorRef;
18 import akka.util.Timeout;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.collect.Sets;
21 import com.google.common.util.concurrent.Uninterruptibles;
23 import java.util.Collection;
24 import java.util.HashSet;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mock;
33 import org.mockito.MockitoAnnotations;
34 import org.opendaylight.controller.cluster.DataPersistenceProvider;
35 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
36 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
37 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
38 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
39 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
40 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
42 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
45 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
46 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
47 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
48 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
49 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
50 import org.opendaylight.controller.cluster.raft.RaftState;
51 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
52 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
53 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54 import scala.concurrent.Await;
55 import scala.concurrent.Future;
57 public class ShardManagerTest extends AbstractActorTest {
58 private static int ID_COUNTER = 1;
60 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
61 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
64 private static CountDownLatch ready;
66 private static ActorRef mockShardActor;
70 MockitoAnnotations.initMocks(this);
72 InMemoryJournal.clear();
74 if(mockShardActor == null) {
75 String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
76 mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
81 public void tearDown() {
82 InMemoryJournal.clear();
85 private Props newShardMgrProps() {
86 DatastoreContext.Builder builder = DatastoreContext.newBuilder();
87 builder.dataStoreType(shardMrgIDSuffix);
88 return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
89 builder.build(), ready);
93 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
94 new JavaTestKit(getSystem()) {{
95 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
97 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
99 shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
101 expectMsgEquals(duration("5 seconds"),
102 new PrimaryNotFound("non-existent").toSerializable());
107 public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
108 new JavaTestKit(getSystem()) {{
109 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
111 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
112 shardManager.tell(new ActorInitialized(), mockShardActor);
114 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
116 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
121 public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
122 new JavaTestKit(getSystem()) {{
123 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
125 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
127 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
132 public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
133 new JavaTestKit(getSystem()) {{
134 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
136 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
138 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
139 // delayed until we send ActorInitialized.
140 Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
141 new Timeout(5, TimeUnit.SECONDS));
143 shardManager.tell(new ActorInitialized(), mockShardActor);
145 Object resp = Await.result(future, duration("5 seconds"));
146 assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
151 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
152 new JavaTestKit(getSystem()) {{
153 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
155 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
157 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
159 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
161 assertEquals("getShardName", "non-existent", notFound.getShardName());
166 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
167 new JavaTestKit(getSystem()) {{
168 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
170 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
171 shardManager.tell(new ActorInitialized(), mockShardActor);
173 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
175 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
177 assertTrue("Found path contains " + found.getPath().path().toString(),
178 found.getPath().path().toString().contains("member-1-shard-default-config"));
183 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
184 new JavaTestKit(getSystem()) {{
185 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
187 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
189 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
194 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
195 new JavaTestKit(getSystem()) {{
196 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
198 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
200 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
201 // delayed until we send ActorInitialized.
202 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
203 new Timeout(5, TimeUnit.SECONDS));
205 shardManager.tell(new ActorInitialized(), mockShardActor);
207 Object resp = Await.result(future, duration("5 seconds"));
208 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
213 public void testOnReceiveMemberUp() throws Exception {
214 new JavaTestKit(getSystem()) {{
215 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
217 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
219 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
221 PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
222 PrimaryFound.SERIALIZABLE_CLASS));
223 String path = found.getPrimaryPath();
224 assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
229 public void testOnReceiveMemberDown() throws Exception {
231 new JavaTestKit(getSystem()) {{
232 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
234 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
236 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
238 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
240 MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
242 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
244 expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
249 public void testOnRecoveryJournalIsCleaned() {
250 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
251 ImmutableSet.of("foo")));
252 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
253 ImmutableSet.of("bar")));
254 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
256 new JavaTestKit(getSystem()) {{
257 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
258 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
260 shardManager.underlyingActor().waitForRecoveryComplete();
261 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
263 // Journal entries up to the last one should've been deleted
264 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
265 synchronized (journal) {
266 assertEquals("Journal size", 1, journal.size());
267 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
273 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
274 final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
275 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
277 new JavaTestKit(getSystem()) {{
278 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
279 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
281 shardManager.underlyingActor().waitForRecoveryComplete();
283 Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
285 assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
290 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
292 new JavaTestKit(getSystem()) {{
293 final TestActorRef<ShardManager> shardManager =
294 TestActorRef.create(getSystem(), newShardMgrProps());
296 assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
298 ModuleIdentifier foo = mock(ModuleIdentifier.class);
299 when(foo.getNamespace()).thenReturn(new URI("foo"));
301 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
302 moduleIdentifierSet.add(foo);
304 SchemaContext schemaContext = mock(SchemaContext.class);
305 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
307 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
309 assertEquals("getKnownModules", Sets.newHashSet("foo"),
310 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
312 ModuleIdentifier bar = mock(ModuleIdentifier.class);
313 when(bar.getNamespace()).thenReturn(new URI("bar"));
315 moduleIdentifierSet.add(bar);
317 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
319 assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
320 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
325 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
327 new JavaTestKit(getSystem()) {{
328 final TestActorRef<ShardManager> shardManager =
329 TestActorRef.create(getSystem(), newShardMgrProps());
331 SchemaContext schemaContext = mock(SchemaContext.class);
332 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
334 ModuleIdentifier foo = mock(ModuleIdentifier.class);
335 when(foo.getNamespace()).thenReturn(new URI("foo"));
337 moduleIdentifierSet.add(foo);
339 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
341 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
343 assertEquals("getKnownModules", Sets.newHashSet("foo"),
344 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
346 //Create a completely different SchemaContext with only the bar module in it
347 //schemaContext = mock(SchemaContext.class);
348 moduleIdentifierSet.clear();
349 ModuleIdentifier bar = mock(ModuleIdentifier.class);
350 when(bar.getNamespace()).thenReturn(new URI("bar"));
352 moduleIdentifierSet.add(bar);
354 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
356 assertEquals("getKnownModules", Sets.newHashSet("foo"),
357 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
363 public void testRecoveryApplicable(){
364 new JavaTestKit(getSystem()) {
366 final Props persistentProps = ShardManager.props(
367 new MockClusterWrapper(),
368 new MockConfiguration(),
369 DatastoreContext.newBuilder().persistent(true).build(), ready);
370 final TestActorRef<ShardManager> persistentShardManager =
371 TestActorRef.create(getSystem(), persistentProps);
373 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
375 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
377 final Props nonPersistentProps = ShardManager.props(
378 new MockClusterWrapper(),
379 new MockConfiguration(),
380 DatastoreContext.newBuilder().persistent(false).build(), ready);
381 final TestActorRef<ShardManager> nonPersistentShardManager =
382 TestActorRef.create(getSystem(), nonPersistentProps);
384 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
386 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
394 public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
396 final CountDownLatch persistLatch = new CountDownLatch(1);
397 final Creator<ShardManager> creator = new Creator<ShardManager>() {
398 private static final long serialVersionUID = 1L;
400 public ShardManager create() throws Exception {
401 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
403 protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
404 DataPersistenceProviderMonitor dataPersistenceProviderMonitor
405 = new DataPersistenceProviderMonitor();
406 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
407 return dataPersistenceProviderMonitor;
413 new JavaTestKit(getSystem()) {{
415 final TestActorRef<ShardManager> shardManager =
416 TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
418 ModuleIdentifier foo = mock(ModuleIdentifier.class);
419 when(foo.getNamespace()).thenReturn(new URI("foo"));
421 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
422 moduleIdentifierSet.add(foo);
424 SchemaContext schemaContext = mock(SchemaContext.class);
425 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
427 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
429 assertEquals("Persisted", true,
430 Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
436 public void testRoleChangeNotificationReleaseReady() throws Exception {
437 new JavaTestKit(getSystem()) {
439 final Props persistentProps = ShardManager.props(
440 new MockClusterWrapper(),
441 new MockConfiguration(),
442 DatastoreContext.newBuilder().persistent(true).build(), ready);
443 final TestActorRef<ShardManager> shardManager =
444 TestActorRef.create(getSystem(), persistentProps);
446 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
448 verify(ready, times(1)).countDown();
454 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
455 new JavaTestKit(getSystem()) {
457 final Props persistentProps = ShardManager.props(
458 new MockClusterWrapper(),
459 new MockConfiguration(),
460 DatastoreContext.newBuilder().persistent(true).build(), ready);
461 final TestActorRef<ShardManager> shardManager =
462 TestActorRef.create(getSystem(), persistentProps);
464 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
466 verify(ready, never()).countDown();
473 private static class TestShardManager extends ShardManager {
474 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
476 TestShardManager(String shardMrgIDSuffix) {
477 super(new MockClusterWrapper(), new MockConfiguration(),
478 DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
482 public void handleRecover(Object message) throws Exception {
484 super.handleRecover(message);
486 if(message instanceof RecoveryCompleted) {
487 recoveryComplete.countDown();
492 void waitForRecoveryComplete() {
493 assertEquals("Recovery complete", true,
494 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
498 @SuppressWarnings("serial")
499 static class TestShardManagerCreator implements Creator<TestShardManager> {
500 String shardMrgIDSuffix;
502 TestShardManagerCreator(String shardMrgIDSuffix) {
503 this.shardMrgIDSuffix = shardMrgIDSuffix;
507 public TestShardManager create() throws Exception {
508 return new TestShardManager(shardMrgIDSuffix);
513 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
514 private static final long serialVersionUID = 1L;
515 private Creator<ShardManager> delegate;
517 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
518 this.delegate = delegate;
522 public ShardManager create() throws Exception {
523 return delegate.create();