1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
7 import akka.japi.Procedure;
8 import akka.persistence.PersistentConfirmation;
9 import akka.persistence.PersistentId;
10 import akka.persistence.PersistentImpl;
11 import akka.persistence.PersistentRepr;
12 import akka.persistence.journal.japi.AsyncWriteJournal;
13 import akka.testkit.JavaTestKit;
14 import akka.testkit.TestActorRef;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import com.typesafe.config.Config;
19 import com.typesafe.config.ConfigFactory;
20 import com.typesafe.config.ConfigValueFactory;
21 import org.junit.AfterClass;
22 import org.junit.Before;
23 import org.junit.BeforeClass;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
27 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
28 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
29 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
30 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
32 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
33 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
34 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
35 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
36 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
37 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
38 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import scala.concurrent.Future;
43 import java.util.Collection;
44 import java.util.HashMap;
45 import java.util.HashSet;
48 import java.util.concurrent.Callable;
49 import java.util.concurrent.TimeUnit;
51 import static junit.framework.Assert.assertEquals;
52 import static org.junit.Assert.assertFalse;
53 import static org.junit.Assert.assertTrue;
54 import static org.mockito.Mockito.mock;
55 import static org.mockito.Mockito.when;
57 public class ShardManagerTest {
58 private static ActorSystem system;
59 Configuration mockConfig = new MockConfiguration();
60 private static ActorRef defaultShardMockActor;
63 public static void setUpClass() {
64 Map<String, String> myJournal = new HashMap<>();
65 myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
66 myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
67 Config config = ConfigFactory.load()
68 .withValue("akka.persistence.journal.plugin",
69 ConfigValueFactory.fromAnyRef("my-journal"))
70 .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
74 system = ActorSystem.create("test", config);
76 String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
77 defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name);
83 public static void tearDown() {
84 JavaTestKit.shutdownActorSystem(system);
89 public void setUpTest(){
94 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
96 new JavaTestKit(system) {
98 final Props props = ShardManager
99 .props("config", new MockClusterWrapper(),
100 new MockConfiguration(), new DatastoreContext());
102 final ActorRef subject = getSystem().actorOf(props);
104 subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
106 expectMsgEquals(duration("2 seconds"),
107 new PrimaryNotFound("inventory").toSerializable());
112 public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
114 new JavaTestKit(system) {{
115 final Props props = ShardManager
116 .props("config", new MockClusterWrapper(),
117 new MockConfiguration(), new DatastoreContext());
119 final ActorRef subject = getSystem().actorOf(props);
121 subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
122 subject.tell(new ActorInitialized(), defaultShardMockActor);
124 subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
126 expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
132 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
134 new JavaTestKit(system) {{
135 final Props props = ShardManager
136 .props("config", new MockClusterWrapper(),
137 new MockConfiguration(), new DatastoreContext());
139 final ActorRef subject = getSystem().actorOf(props);
141 subject.tell(new FindLocalShard("inventory"), getRef());
143 final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
145 protected String match(Object in) {
146 if (in instanceof LocalShardNotFound) {
147 return ((LocalShardNotFound) in).getShardName();
152 }.get(); // this extracts the received message
154 assertEquals("inventory", out);
159 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
161 final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
163 new JavaTestKit(system) {{
164 final Props props = ShardManager
165 .props("config", mockClusterWrapper,
166 new MockConfiguration(), new DatastoreContext());
168 final ActorRef subject = getSystem().actorOf(props);
170 subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
171 subject.tell(new ActorInitialized(), defaultShardMockActor);
173 subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
175 final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
177 protected ActorRef match(Object in) {
178 if (in instanceof LocalShardFound) {
179 return ((LocalShardFound) in).getPath();
184 }.get(); // this extracts the received message
186 assertTrue(out.path().toString(),
187 out.path().toString().contains("member-1-shard-default-config"));
192 public void testOnReceiveMemberUp() throws Exception {
194 new JavaTestKit(system) {{
195 final Props props = ShardManager
196 .props("config", new MockClusterWrapper(),
197 new MockConfiguration(), new DatastoreContext());
199 final ActorRef subject = getSystem().actorOf(props);
201 MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
203 subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
205 final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
206 // do not put code outside this method, will run afterwards
208 protected String match(Object in) {
209 if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
210 PrimaryFound f = PrimaryFound.fromSerializable(in);
211 return f.getPrimaryPath();
216 }.get(); // this extracts the received message
218 assertTrue(out, out.contains("member-2-shard-astronauts-config"));
223 public void testOnReceiveMemberDown() throws Exception {
225 new JavaTestKit(system) {{
226 final Props props = ShardManager
227 .props("config", new MockClusterWrapper(),
228 new MockConfiguration(), new DatastoreContext());
230 final ActorRef subject = getSystem().actorOf(props);
232 MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
234 subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
236 expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
238 MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
240 subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
242 expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
247 public void testOnRecoveryJournalIsEmptied(){
248 MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
249 ImmutableSet.of("foo")));
251 assertEquals(1, MyJournal.get().size());
253 new JavaTestKit(system) {{
254 final Props props = ShardManager
255 .props("config", new MockClusterWrapper(),
256 new MockConfiguration(), new DatastoreContext());
258 final ActorRef subject = getSystem().actorOf(props);
260 // Send message to check that ShardManager is ready
261 subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
263 expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
265 assertEquals(0, MyJournal.get().size());
270 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
271 new JavaTestKit(system) {{
272 final Props props = ShardManager
273 .props("config", new MockClusterWrapper(),
274 new MockConfiguration(), new DatastoreContext());
275 final TestActorRef<ShardManager> subject =
276 TestActorRef.create(system, props);
278 subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
280 Collection<String> knownModules = subject.underlyingActor().getKnownModules();
282 assertTrue(knownModules.contains("foo"));
287 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
289 new JavaTestKit(system) {{
290 final Props props = ShardManager
291 .props("config", new MockClusterWrapper(),
292 new MockConfiguration(), new DatastoreContext());
293 final TestActorRef<ShardManager> subject =
294 TestActorRef.create(system, props);
296 Collection<String> knownModules = subject.underlyingActor().getKnownModules();
298 assertEquals(0, knownModules.size());
300 SchemaContext schemaContext = mock(SchemaContext.class);
301 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
303 ModuleIdentifier foo = mock(ModuleIdentifier.class);
304 when(foo.getNamespace()).thenReturn(new URI("foo"));
306 moduleIdentifierSet.add(foo);
308 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
310 subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
312 assertTrue(knownModules.contains("foo"));
314 assertEquals(1, knownModules.size());
316 ModuleIdentifier bar = mock(ModuleIdentifier.class);
317 when(bar.getNamespace()).thenReturn(new URI("bar"));
319 moduleIdentifierSet.add(bar);
321 subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
323 assertTrue(knownModules.contains("bar"));
325 assertEquals(2, knownModules.size());
333 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
335 new JavaTestKit(system) {{
336 final Props props = ShardManager
337 .props("config", new MockClusterWrapper(),
338 new MockConfiguration(), new DatastoreContext());
339 final TestActorRef<ShardManager> subject =
340 TestActorRef.create(system, props);
342 Collection<String> knownModules = subject.underlyingActor().getKnownModules();
344 assertEquals(0, knownModules.size());
346 SchemaContext schemaContext = mock(SchemaContext.class);
347 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
349 ModuleIdentifier foo = mock(ModuleIdentifier.class);
350 when(foo.getNamespace()).thenReturn(new URI("foo"));
352 moduleIdentifierSet.add(foo);
354 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
356 subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
358 assertTrue(knownModules.contains("foo"));
360 assertEquals(1, knownModules.size());
362 //Create a completely different SchemaContext with only the bar module in it
363 schemaContext = mock(SchemaContext.class);
364 moduleIdentifierSet = new HashSet<>();
365 ModuleIdentifier bar = mock(ModuleIdentifier.class);
366 when(bar.getNamespace()).thenReturn(new URI("bar"));
368 moduleIdentifierSet.add(bar);
370 subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
372 assertFalse(knownModules.contains("bar"));
374 assertEquals(1, knownModules.size());
381 private void sleep(long period){
382 Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
385 public static class MyJournal extends AsyncWriteJournal {
387 private static Map<Long, Object> journal = Maps.newTreeMap();
389 public static void addToJournal(Long sequenceNr, Object value){
390 journal.put(sequenceNr, value);
393 public static Map<Long, Object> get(){
397 public static void clear(){
401 @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
402 final Procedure<PersistentRepr> replayCallback) {
403 if(journal.size() == 0){
404 return Futures.successful(null);
406 return Futures.future(new Callable<Void>() {
408 public Void call() throws Exception {
409 for (Map.Entry<Long, Object> entry : journal.entrySet()) {
410 PersistentRepr persistentMessage =
411 new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
413 replayCallback.apply(persistentMessage);
417 }, context().dispatcher());
420 @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
421 return Futures.successful(-1L);
424 @Override public Future<Void> doAsyncWriteMessages(
425 final Iterable<PersistentRepr> persistentReprs) {
426 return Futures.future(new Callable<Void>() {
428 public Void call() throws Exception {
429 for (PersistentRepr repr : persistentReprs){
430 if(repr.payload() instanceof ShardManager.SchemaContextModules) {
431 journal.put(repr.sequenceNr(), repr.payload());
436 }, context().dispatcher());
439 @Override public Future<Void> doAsyncWriteConfirmations(
440 Iterable<PersistentConfirmation> persistentConfirmations) {
441 return Futures.successful(null);
444 @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
447 return Futures.successful(null);
450 @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
452 return Futures.successful(null);