import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.japi.Function;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.RecoveryFailure;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* The ShardManager has the following jobs,
* <li> Monitor the cluster members and store their addresses
* <ul>
*/
-public class ShardManager extends AbstractUntypedActorWithMetering {
+public class ShardManager extends AbstractUntypedPersistentActor {
+
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
private final DatastoreContext datastoreContext;
+ private final Collection<String> knownModules = new HashSet<>(128);
+
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
}
@Override
- public void handleReceive(Object message) throws Exception {
+ public void handleCommand(Object message) throws Exception {
if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
findPrimary(
FindPrimary.fromSerializable(message));
}
+ @Override protected void handleRecover(Object message) throws Exception {
+
+ if(message instanceof SchemaContextModules){
+ SchemaContextModules msg = (SchemaContextModules) message;
+ knownModules.clear();
+ knownModules.addAll(msg.getModules());
+ } else if(message instanceof RecoveryFailure){
+ RecoveryFailure failure = (RecoveryFailure) message;
+ LOG.error(failure.cause(), "Recovery failed");
+ } else if(message instanceof RecoveryCompleted){
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // Delete all the messages from the akka journal except the last one
+ deleteMessages(lastSequenceNr() - 1);
+ }
+ }
+
private void findLocalShard(FindLocalShard message) {
ShardInformation shardInformation =
localShards.get(message.getShardName());
*
* @param message
*/
- private void updateSchemaContext(Object message) {
- SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+ private void updateSchemaContext(final Object message) {
+ final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+ Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+ Set<String> newModules = new HashSet<>(128);
+
+ for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+ String s = moduleIdentifier.getNamespace().toString();
+ newModules.add(s);
+ }
+
+ if(newModules.containsAll(knownModules)) {
+
+ LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
+
+ knownModules.clear();
+ knownModules.addAll(newModules);
+
+ persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
- if(localShards.size() == 0){
- createLocalShards(schemaContext);
+ @Override public void apply(SchemaContextModules param) throws Exception {
+ LOG.info("Sending new SchemaContext to Shards");
+ if (localShards.size() == 0) {
+ createLocalShards(schemaContext);
+ } else {
+ for (ShardInformation info : localShards.values()) {
+ info.getActor().tell(message, getSelf());
+ }
+ }
+ }
+
+ });
} else {
- for (ShardInformation info : localShards.values()) {
- info.getActor().tell(message, getSelf());
- }
+ LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
}
+
}
private void findPrimary(FindPrimary message) {
}
+ @Override public String persistenceId() {
+ return "shard-manager-" + type;
+ }
+
+ @VisibleForTesting public Collection<String> getKnownModules() {
+ return knownModules;
+ }
+
private class ShardInformation {
private final String shardName;
private final ActorRef actor;
return new ShardManager(type, cluster, configuration, datastoreContext);
}
}
+
+ static class SchemaContextModules implements Serializable {
+ private final Set<String> modules;
+
+ SchemaContextModules(Set<String> modules){
+ this.modules = modules;
+ }
+
+ public Set<String> getModules() {
+ return modules;
+ }
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import junit.framework.Assert;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import scala.concurrent.duration.Duration;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class ShardManagerTest {
private static ActorSystem system;
@BeforeClass
- public static void setUp() {
- system = ActorSystem.create("test");
+ public static void setUpClass() {
+ Map<String, String> myJournal = new HashMap<>();
+ myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
+ myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
+ Config config = ConfigFactory.load()
+ .withValue("akka.persistence.journal.plugin",
+ ConfigValueFactory.fromAnyRef("my-journal"))
+ .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
+
+ MyJournal.clear();
+
+ system = ActorSystem.create("test", config);
}
@AfterClass
system = null;
}
+ @Before
+ public void setUpTest(){
+ MyJournal.clear();
+ }
+
@Test
public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
- final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
+ new JavaTestKit(system) {
+ {
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
+ final ActorRef subject = getSystem().actorOf(props);
- subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
+ subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
- expectMsgEquals(Duration.Zero(),
- new PrimaryNotFound("inventory").toSerializable());
-
- expectNoMsg();
- }
- };
- }};
+ expectMsgEquals(duration("2 seconds"),
+ new PrimaryNotFound("inventory").toSerializable());
+ }};
}
@Test
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
new MockConfiguration(), new DatastoreContext());
- final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
-
- subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
+ final ActorRef subject = getSystem().actorOf(props);
- subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+ subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
- expectNoMsg();
- }
- };
+ expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
}};
}
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
new MockConfiguration(), new DatastoreContext());
- final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new FindLocalShard("inventory"), getRef());
-
- final String out = new ExpectMsg<String>(duration("10 seconds"), "find local") {
- @Override
- protected String match(Object in) {
- if (in instanceof LocalShardNotFound) {
- return ((LocalShardNotFound) in).getShardName();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ final ActorRef subject = getSystem().actorOf(props);
- assertEquals("inventory", out);
+ subject.tell(new FindLocalShard("inventory"), getRef());
- expectNoMsg();
+ final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
+ @Override
+ protected String match(Object in) {
+ if (in instanceof LocalShardNotFound) {
+ return ((LocalShardNotFound) in).getShardName();
+ } else {
+ throw noMatch();
+ }
}
- };
+ }.get(); // this extracts the received message
+
+ assertEquals("inventory", out);
}};
}
final Props props = ShardManager
.props("config", mockClusterWrapper,
new MockConfiguration(), new DatastoreContext());
- final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
+
+ final ActorRef subject = getSystem().actorOf(props);
subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- new Within(duration("10 seconds")) {
+ subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+
+ final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
@Override
- protected void run() {
-
- subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
-
- final ActorRef out = new ExpectMsg<ActorRef>(duration("10 seconds"), "find local") {
- @Override
- protected ActorRef match(Object in) {
- if (in instanceof LocalShardFound) {
- return ((LocalShardFound) in).getPath();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ protected ActorRef match(Object in) {
+ if (in instanceof LocalShardFound) {
+ return ((LocalShardFound) in).getPath();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertTrue(out.path().toString(),
+ out.path().toString().contains("member-1-shard-default-config"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveMemberUp() throws Exception {
+
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
- assertTrue(out.path().toString(), out.path().toString().contains("member-1-shard-default-config"));
+ final ActorRef subject = getSystem().actorOf(props);
+ MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
- expectNoMsg();
+ subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+ final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
+ // do not put code outside this method, will run afterwards
+ @Override
+ protected String match(Object in) {
+ if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+ PrimaryFound f = PrimaryFound.fromSerializable(in);
+ return f.getPrimaryPath();
+ } else {
+ throw noMatch();
+ }
}
- };
+ }.get(); // this extracts the received message
+
+ assertTrue(out, out.contains("member-2-shard-astronauts-config"));
}};
}
@Test
- public void testOnReceiveMemberUp() throws Exception {
+ public void testOnReceiveMemberDown() throws Exception {
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+
+ final ActorRef subject = getSystem().actorOf(props);
+
+ MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+
+ subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+
+ MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+
+ subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+ expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryJournalIsEmptied(){
+ MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("foo")));
+
+ assertEquals(1, MyJournal.get().size());
+
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+
+ final ActorRef subject = getSystem().actorOf(props);
+
+ // Send message to check that ShardManager is ready
+ subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+
+ assertEquals(0, MyJournal.get().size());
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
- // the run() method needs to finish within 3 seconds
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
-
- MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
-
- subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "primary found") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound f = PrimaryFound.fromSerializable(in);
- return f.getPrimaryPath();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
- Assert.assertTrue(out, out.contains("member-2-shard-astronauts-config"));
+ Collection<String> knownModules = subject.underlyingActor().getKnownModules();
- expectNoMsg();
- }
- };
+ assertTrue(knownModules.contains("foo"));
}};
}
@Test
- public void testOnReceiveMemberDown() throws Exception {
+ public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
+ throws Exception {
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+ final TestActorRef<ShardManager> subject =
+ TestActorRef.create(system, props);
+
+ Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+
+ assertEquals(0, knownModules.size());
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ moduleIdentifierSet.add(foo);
+
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertTrue(knownModules.contains("foo"));
+
+ assertEquals(1, knownModules.size());
+
+ ModuleIdentifier bar = mock(ModuleIdentifier.class);
+ when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+ moduleIdentifierSet.add(bar);
+
+ subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertTrue(knownModules.contains("bar"));
+ assertEquals(2, knownModules.size());
+
+ }};
+
+ }
+
+
+ @Test
+ public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
+ throws Exception {
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
- // the run() method needs to finish within 3 seconds
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
+ Collection<String> knownModules = subject.underlyingActor().getKnownModules();
- MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+ assertEquals(0, knownModules.size());
- subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
- expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
- MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+ moduleIdentifierSet.add(foo);
- subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
- expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+ subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertTrue(knownModules.contains("foo"));
+
+ assertEquals(1, knownModules.size());
+
+ //Create a completely different SchemaContext with only the bar module in it
+ schemaContext = mock(SchemaContext.class);
+ moduleIdentifierSet = new HashSet<>();
+ ModuleIdentifier bar = mock(ModuleIdentifier.class);
+ when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+ moduleIdentifierSet.add(bar);
+
+ subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertFalse(knownModules.contains("bar"));
+
+ assertEquals(1, knownModules.size());
- expectNoMsg();
- }
- };
}};
+
+ }
+
+
+ private void sleep(long period){
+ Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
}
+ public static class MyJournal extends AsyncWriteJournal {
+
+ private static Map<Long, Object> journal = Maps.newTreeMap();
+
+ public static void addToJournal(Long sequenceNr, Object value){
+ journal.put(sequenceNr, value);
+ }
+
+ public static Map<Long, Object> get(){
+ return journal;
+ }
+
+ public static void clear(){
+ journal.clear();
+ }
+ @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
+ final Procedure<PersistentRepr> replayCallback) {
+ if(journal.size() == 0){
+ return Futures.successful(null);
+ }
+ return Futures.future(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (Map.Entry<Long, Object> entry : journal.entrySet()) {
+ PersistentRepr persistentMessage =
+ new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+ false, null, null);
+ replayCallback.apply(persistentMessage);
+ }
+ return null;
+ }
+ }, context().dispatcher());
+ }
+
+ @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
+ return Futures.successful(-1L);
+ }
+
+ @Override public Future<Void> doAsyncWriteMessages(
+ final Iterable<PersistentRepr> persistentReprs) {
+ return Futures.future(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (PersistentRepr repr : persistentReprs){
+ if(repr.payload() instanceof ShardManager.SchemaContextModules) {
+ journal.put(repr.sequenceNr(), repr.payload());
+ }
+ }
+ return null;
+ }
+ }, context().dispatcher());
+ }
+
+ @Override public Future<Void> doAsyncWriteConfirmations(
+ Iterable<PersistentConfirmation> persistentConfirmations) {
+ return Futures.successful(null);
+ }
+
+ @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
+ boolean b) {
+ clear();
+ return Futures.successful(null);
+ }
+
+ @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
+ clear();
+ return Futures.successful(null);
+ }
+ }
}