1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
7 import akka.japi.Procedure;
8 import akka.persistence.PersistentConfirmation;
9 import akka.persistence.PersistentId;
10 import akka.persistence.PersistentImpl;
11 import akka.persistence.PersistentRepr;
12 import akka.persistence.journal.japi.AsyncWriteJournal;
13 import akka.testkit.JavaTestKit;
14 import akka.testkit.TestActorRef;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import com.typesafe.config.Config;
19 import com.typesafe.config.ConfigFactory;
20 import com.typesafe.config.ConfigValueFactory;
21 import org.junit.AfterClass;
22 import org.junit.Before;
23 import org.junit.BeforeClass;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
26 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
28 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
29 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
30 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
32 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
33 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
34 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
35 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import scala.concurrent.Future;
40 import java.util.Collection;
41 import java.util.HashMap;
42 import java.util.HashSet;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.TimeUnit;
48 import static junit.framework.Assert.assertEquals;
49 import static org.junit.Assert.assertFalse;
50 import static org.junit.Assert.assertTrue;
51 import static org.mockito.Mockito.mock;
52 import static org.mockito.Mockito.when;
54 public class ShardManagerTest {
55 private static ActorSystem system;
58 public static void setUpClass() {
59 Map<String, String> myJournal = new HashMap<>();
60 myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
61 myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
62 Config config = ConfigFactory.load()
63 .withValue("akka.persistence.journal.plugin",
64 ConfigValueFactory.fromAnyRef("my-journal"))
65 .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
69 system = ActorSystem.create("test", config);
73 public static void tearDown() {
74 JavaTestKit.shutdownActorSystem(system);
79 public void setUpTest(){
84 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
86 new JavaTestKit(system) {
88 final Props props = ShardManager
89 .props("config", new MockClusterWrapper(),
90 new MockConfiguration(), new DatastoreContext());
92 final ActorRef subject = getSystem().actorOf(props);
94 subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
96 expectMsgEquals(duration("2 seconds"),
97 new PrimaryNotFound("inventory").toSerializable());
102 public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
104 new JavaTestKit(system) {{
105 final Props props = ShardManager
106 .props("config", new MockClusterWrapper(),
107 new MockConfiguration(), new DatastoreContext());
109 final ActorRef subject = getSystem().actorOf(props);
111 subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
113 subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
115 expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
120 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
122 new JavaTestKit(system) {{
123 final Props props = ShardManager
124 .props("config", new MockClusterWrapper(),
125 new MockConfiguration(), new DatastoreContext());
127 final ActorRef subject = getSystem().actorOf(props);
129 subject.tell(new FindLocalShard("inventory"), getRef());
131 final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
133 protected String match(Object in) {
134 if (in instanceof LocalShardNotFound) {
135 return ((LocalShardNotFound) in).getShardName();
140 }.get(); // this extracts the received message
142 assertEquals("inventory", out);
147 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
149 final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
151 new JavaTestKit(system) {{
152 final Props props = ShardManager
153 .props("config", mockClusterWrapper,
154 new MockConfiguration(), new DatastoreContext());
156 final ActorRef subject = getSystem().actorOf(props);
158 subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
160 subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
162 final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
164 protected ActorRef match(Object in) {
165 if (in instanceof LocalShardFound) {
166 return ((LocalShardFound) in).getPath();
171 }.get(); // this extracts the received message
173 assertTrue(out.path().toString(),
174 out.path().toString().contains("member-1-shard-default-config"));
179 public void testOnReceiveMemberUp() throws Exception {
181 new JavaTestKit(system) {{
182 final Props props = ShardManager
183 .props("config", new MockClusterWrapper(),
184 new MockConfiguration(), new DatastoreContext());
186 final ActorRef subject = getSystem().actorOf(props);
188 MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
190 subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
192 final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
193 // do not put code outside this method, will run afterwards
195 protected String match(Object in) {
196 if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
197 PrimaryFound f = PrimaryFound.fromSerializable(in);
198 return f.getPrimaryPath();
203 }.get(); // this extracts the received message
205 assertTrue(out, out.contains("member-2-shard-astronauts-config"));
210 public void testOnReceiveMemberDown() throws Exception {
212 new JavaTestKit(system) {{
213 final Props props = ShardManager
214 .props("config", new MockClusterWrapper(),
215 new MockConfiguration(), new DatastoreContext());
217 final ActorRef subject = getSystem().actorOf(props);
219 MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
221 subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
223 expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
225 MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
227 subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
229 expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
234 public void testOnRecoveryJournalIsEmptied(){
235 MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
236 ImmutableSet.of("foo")));
238 assertEquals(1, MyJournal.get().size());
240 new JavaTestKit(system) {{
241 final Props props = ShardManager
242 .props("config", new MockClusterWrapper(),
243 new MockConfiguration(), new DatastoreContext());
245 final ActorRef subject = getSystem().actorOf(props);
247 // Send message to check that ShardManager is ready
248 subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
250 expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
252 assertEquals(0, MyJournal.get().size());
257 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
258 new JavaTestKit(system) {{
259 final Props props = ShardManager
260 .props("config", new MockClusterWrapper(),
261 new MockConfiguration(), new DatastoreContext());
262 final TestActorRef<ShardManager> subject =
263 TestActorRef.create(system, props);
265 subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
267 Collection<String> knownModules = subject.underlyingActor().getKnownModules();
269 assertTrue(knownModules.contains("foo"));
274 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
276 new JavaTestKit(system) {{
277 final Props props = ShardManager
278 .props("config", new MockClusterWrapper(),
279 new MockConfiguration(), new DatastoreContext());
280 final TestActorRef<ShardManager> subject =
281 TestActorRef.create(system, props);
283 Collection<String> knownModules = subject.underlyingActor().getKnownModules();
285 assertEquals(0, knownModules.size());
287 SchemaContext schemaContext = mock(SchemaContext.class);
288 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
290 ModuleIdentifier foo = mock(ModuleIdentifier.class);
291 when(foo.getNamespace()).thenReturn(new URI("foo"));
293 moduleIdentifierSet.add(foo);
295 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
297 subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
299 assertTrue(knownModules.contains("foo"));
301 assertEquals(1, knownModules.size());
303 ModuleIdentifier bar = mock(ModuleIdentifier.class);
304 when(bar.getNamespace()).thenReturn(new URI("bar"));
306 moduleIdentifierSet.add(bar);
308 subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
310 assertTrue(knownModules.contains("bar"));
312 assertEquals(2, knownModules.size());
320 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
322 new JavaTestKit(system) {{
323 final Props props = ShardManager
324 .props("config", new MockClusterWrapper(),
325 new MockConfiguration(), new DatastoreContext());
326 final TestActorRef<ShardManager> subject =
327 TestActorRef.create(system, props);
329 Collection<String> knownModules = subject.underlyingActor().getKnownModules();
331 assertEquals(0, knownModules.size());
333 SchemaContext schemaContext = mock(SchemaContext.class);
334 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
336 ModuleIdentifier foo = mock(ModuleIdentifier.class);
337 when(foo.getNamespace()).thenReturn(new URI("foo"));
339 moduleIdentifierSet.add(foo);
341 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
343 subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
345 assertTrue(knownModules.contains("foo"));
347 assertEquals(1, knownModules.size());
349 //Create a completely different SchemaContext with only the bar module in it
350 schemaContext = mock(SchemaContext.class);
351 moduleIdentifierSet = new HashSet<>();
352 ModuleIdentifier bar = mock(ModuleIdentifier.class);
353 when(bar.getNamespace()).thenReturn(new URI("bar"));
355 moduleIdentifierSet.add(bar);
357 subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
359 assertFalse(knownModules.contains("bar"));
361 assertEquals(1, knownModules.size());
368 private void sleep(long period){
369 Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
372 public static class MyJournal extends AsyncWriteJournal {
374 private static Map<Long, Object> journal = Maps.newTreeMap();
376 public static void addToJournal(Long sequenceNr, Object value){
377 journal.put(sequenceNr, value);
380 public static Map<Long, Object> get(){
384 public static void clear(){
388 @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
389 final Procedure<PersistentRepr> replayCallback) {
390 if(journal.size() == 0){
391 return Futures.successful(null);
393 return Futures.future(new Callable<Void>() {
395 public Void call() throws Exception {
396 for (Map.Entry<Long, Object> entry : journal.entrySet()) {
397 PersistentRepr persistentMessage =
398 new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
400 replayCallback.apply(persistentMessage);
404 }, context().dispatcher());
407 @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
408 return Futures.successful(-1L);
411 @Override public Future<Void> doAsyncWriteMessages(
412 final Iterable<PersistentRepr> persistentReprs) {
413 return Futures.future(new Callable<Void>() {
415 public Void call() throws Exception {
416 for (PersistentRepr repr : persistentReprs){
417 if(repr.payload() instanceof ShardManager.SchemaContextModules) {
418 journal.put(repr.sequenceNr(), repr.payload());
423 }, context().dispatcher());
426 @Override public Future<Void> doAsyncWriteConfirmations(
427 Iterable<PersistentConfirmation> persistentConfirmations) {
428 return Futures.successful(null);
431 @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
434 return Futures.successful(null);
437 @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
439 return Futures.successful(null);