1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.japi.Creator;
6 import akka.pattern.Patterns;
7 import akka.persistence.RecoveryCompleted;
8 import akka.testkit.JavaTestKit;
9 import akka.testkit.TestActorRef;
10 import akka.util.Timeout;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.collect.Sets;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import org.junit.After;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.DataPersistenceProvider;
18 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
20 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
21 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
22 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
27 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
29 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
30 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
32 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
33 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
39 import java.util.Collection;
40 import java.util.HashSet;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.mockito.Mockito.mock;
50 import static org.mockito.Mockito.when;
52 public class ShardManagerTest extends AbstractActorTest {
53 private static int ID_COUNTER = 1;
55 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
56 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
58 private static ActorRef mockShardActor;
62 InMemoryJournal.clear();
64 if(mockShardActor == null) {
65 String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
66 mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
71 public void tearDown() {
72 InMemoryJournal.clear();
75 private Props newShardMgrProps() {
76 return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
77 DatastoreContext.newBuilder().build());
81 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
82 new JavaTestKit(getSystem()) {{
83 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
85 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
87 shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
89 expectMsgEquals(duration("5 seconds"),
90 new PrimaryNotFound("non-existent").toSerializable());
95 public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
96 new JavaTestKit(getSystem()) {{
97 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
99 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
100 shardManager.tell(new ActorInitialized(), mockShardActor);
102 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
104 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
109 public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
110 new JavaTestKit(getSystem()) {{
111 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
113 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
115 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
117 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
122 public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
123 new JavaTestKit(getSystem()) {{
124 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
126 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
128 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
129 // delayed until we send ActorInitialized.
130 Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
131 new Timeout(5, TimeUnit.SECONDS));
133 shardManager.tell(new ActorInitialized(), mockShardActor);
135 Object resp = Await.result(future, duration("5 seconds"));
136 assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
141 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
142 new JavaTestKit(getSystem()) {{
143 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
145 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
147 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
149 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
151 assertEquals("getShardName", "non-existent", notFound.getShardName());
156 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
157 new JavaTestKit(getSystem()) {{
158 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
160 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
161 shardManager.tell(new ActorInitialized(), mockShardActor);
163 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
165 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
167 assertTrue("Found path contains " + found.getPath().path().toString(),
168 found.getPath().path().toString().contains("member-1-shard-default-config"));
173 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
174 new JavaTestKit(getSystem()) {{
175 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
177 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
179 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
181 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
186 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
187 new JavaTestKit(getSystem()) {{
188 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
190 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
192 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
193 // delayed until we send ActorInitialized.
194 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
195 new Timeout(5, TimeUnit.SECONDS));
197 shardManager.tell(new ActorInitialized(), mockShardActor);
199 Object resp = Await.result(future, duration("5 seconds"));
200 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
205 public void testOnReceiveMemberUp() throws Exception {
206 new JavaTestKit(getSystem()) {{
207 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
209 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
211 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
213 PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
214 PrimaryFound.SERIALIZABLE_CLASS));
215 String path = found.getPrimaryPath();
216 assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
221 public void testOnReceiveMemberDown() throws Exception {
223 new JavaTestKit(getSystem()) {{
224 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
226 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
228 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
230 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
232 MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
234 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
236 expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
241 public void testOnRecoveryJournalIsCleaned() {
242 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
243 ImmutableSet.of("foo")));
244 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
245 ImmutableSet.of("bar")));
246 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
248 new JavaTestKit(getSystem()) {{
249 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
250 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
252 shardManager.underlyingActor().waitForRecoveryComplete();
253 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
255 // Journal entries up to the last one should've been deleted
256 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
257 synchronized (journal) {
258 assertEquals("Journal size", 1, journal.size());
259 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
265 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
266 final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
267 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
269 new JavaTestKit(getSystem()) {{
270 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
271 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
273 shardManager.underlyingActor().waitForRecoveryComplete();
275 Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
277 assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
282 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
284 new JavaTestKit(getSystem()) {{
285 final TestActorRef<ShardManager> shardManager =
286 TestActorRef.create(getSystem(), newShardMgrProps());
288 assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
290 ModuleIdentifier foo = mock(ModuleIdentifier.class);
291 when(foo.getNamespace()).thenReturn(new URI("foo"));
293 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
294 moduleIdentifierSet.add(foo);
296 SchemaContext schemaContext = mock(SchemaContext.class);
297 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
299 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
301 assertEquals("getKnownModules", Sets.newHashSet("foo"),
302 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
304 ModuleIdentifier bar = mock(ModuleIdentifier.class);
305 when(bar.getNamespace()).thenReturn(new URI("bar"));
307 moduleIdentifierSet.add(bar);
309 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
311 assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
312 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
317 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
319 new JavaTestKit(getSystem()) {{
320 final TestActorRef<ShardManager> shardManager =
321 TestActorRef.create(getSystem(), newShardMgrProps());
323 SchemaContext schemaContext = mock(SchemaContext.class);
324 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
326 ModuleIdentifier foo = mock(ModuleIdentifier.class);
327 when(foo.getNamespace()).thenReturn(new URI("foo"));
329 moduleIdentifierSet.add(foo);
331 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
333 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
335 assertEquals("getKnownModules", Sets.newHashSet("foo"),
336 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
338 //Create a completely different SchemaContext with only the bar module in it
339 //schemaContext = mock(SchemaContext.class);
340 moduleIdentifierSet.clear();
341 ModuleIdentifier bar = mock(ModuleIdentifier.class);
342 when(bar.getNamespace()).thenReturn(new URI("bar"));
344 moduleIdentifierSet.add(bar);
346 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
348 assertEquals("getKnownModules", Sets.newHashSet("foo"),
349 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
355 public void testRecoveryApplicable(){
356 new JavaTestKit(getSystem()) {
358 final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
359 new MockClusterWrapper(),
360 new MockConfiguration(),
361 DatastoreContext.newBuilder().persistent(true).build());
362 final TestActorRef<ShardManager> persistentShardManager =
363 TestActorRef.create(getSystem(), persistentProps);
365 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
367 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
369 final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
370 new MockClusterWrapper(),
371 new MockConfiguration(),
372 DatastoreContext.newBuilder().persistent(false).build());
373 final TestActorRef<ShardManager> nonPersistentShardManager =
374 TestActorRef.create(getSystem(), nonPersistentProps);
376 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
378 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
386 public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
388 final CountDownLatch persistLatch = new CountDownLatch(1);
389 final Creator<ShardManager> creator = new Creator<ShardManager>() {
390 private static final long serialVersionUID = 1L;
392 public ShardManager create() throws Exception {
393 return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
395 protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
396 DataPersistenceProviderMonitor dataPersistenceProviderMonitor
397 = new DataPersistenceProviderMonitor();
398 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
399 return dataPersistenceProviderMonitor;
405 new JavaTestKit(getSystem()) {{
407 final TestActorRef<ShardManager> shardManager =
408 TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
410 ModuleIdentifier foo = mock(ModuleIdentifier.class);
411 when(foo.getNamespace()).thenReturn(new URI("foo"));
413 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
414 moduleIdentifierSet.add(foo);
416 SchemaContext schemaContext = mock(SchemaContext.class);
417 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
419 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
421 assertEquals("Persisted", true,
422 Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
429 private static class TestShardManager extends ShardManager {
430 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
432 TestShardManager(String shardMrgIDSuffix) {
433 super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
434 DatastoreContext.newBuilder().build());
438 public void handleRecover(Object message) throws Exception {
440 super.handleRecover(message);
442 if(message instanceof RecoveryCompleted) {
443 recoveryComplete.countDown();
448 void waitForRecoveryComplete() {
449 assertEquals("Recovery complete", true,
450 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
454 @SuppressWarnings("serial")
455 static class TestShardManagerCreator implements Creator<TestShardManager> {
456 String shardMrgIDSuffix;
458 TestShardManagerCreator(String shardMrgIDSuffix) {
459 this.shardMrgIDSuffix = shardMrgIDSuffix;
463 public TestShardManager create() throws Exception {
464 return new TestShardManager(shardMrgIDSuffix);
469 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
470 private static final long serialVersionUID = 1L;
471 private Creator<ShardManager> delegate;
473 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
474 this.delegate = delegate;
478 public ShardManager create() throws Exception {
479 return delegate.create();