import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
+import akka.actor.OneForOneStrategy;
import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import akka.cluster.ClusterEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import akka.japi.Function;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.RecoveryFailure;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* The ShardManager has the following jobs,
- * <p>
+ * <ul>
* <li> Create all the local shard replicas that belong on this cluster member
+ * <li> Find the address of the local shard
* <li> Find the primary replica for any given shard
- * <li> Engage in shard replica elections which decide which replica should be the primary
- * </p>
- * <p/>
- * <h3>>Creation of Shard replicas</h3
- * <p>
- * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- * </p>
- * <p/>
- * <h3> Replica Elections </h3>
- * <p/>
- * <p>
- * The Shard Manager uses multiple cues to initiate election.
- * <li> When a member of the cluster dies
- * <li> When a local shard replica dies
- * <li> When a local shard replica comes alive
- * </p>
+ * <li> Monitor the cluster members and store their addresses
+ * <ul>
*/
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
- // Stores a mapping between a shard name and the address of the current primary
- private final Map<String, Address> shardNameToPrimaryAddress =
- new HashMap<>();
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
// Stores a mapping between a member name and the address of the member
+ // Member names look like "member-1", "member-2" etc and are as specified
+ // in configuration
private final Map<String, Address> memberNameToAddress = new HashMap<>();
- // Stores a mapping between the shard name and all the members on which a replica of that shard are available
- private final Map<String, List<String>> shardNameToMembers =
- new HashMap<>();
+ // Stores a mapping between a shard name and it's corresponding information
+ // Shard names look like inventory, topology etc and are as specified in
+ // configuration
+ private final Map<String, ShardInformation> localShards = new HashMap<>();
- private final LoggingAdapter log =
- Logging.getLogger(getContext().system(), this);
+ // The type of a ShardManager reflects the type of the datastore itself
+ // A data store could be of type config/operational
+ private final String type;
+ private final ClusterWrapper cluster;
- private final Map<String, ActorPath> localShards = new HashMap<>();
+ private final Configuration configuration;
+ private ShardManagerInfoMBean mBean;
- private final ClusterWrapper cluster;
+ private final DatastoreContext datastoreContext;
+
+ private final Collection<String> knownModules = new HashSet<>(128);
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
*/
- private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
- this.cluster = cluster;
- String memberName = cluster.getCurrentMemberName();
+ private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ DatastoreContext datastoreContext) {
+
+ this.type = Preconditions.checkNotNull(type, "type should not be null");
+ this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
+ this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+ this.datastoreContext = datastoreContext;
+
+ // Subscribe this actor to cluster member events
+ cluster.subscribeToMemberEvents(getSelf());
+
+ //createLocalShards(null);
+ }
+
+ public static Props props(final String type,
+ final ClusterWrapper cluster,
+ final Configuration configuration,
+ final DatastoreContext datastoreContext) {
+
+ Preconditions.checkNotNull(type, "type should not be null");
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+ return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+ }
+
+ @Override
+ public void handleCommand(Object message) throws Exception {
+ if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ findPrimary(
+ FindPrimary.fromSerializable(message));
+ } else if(message instanceof FindLocalShard){
+ findLocalShard((FindLocalShard) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext(message);
+ } else if(message instanceof ActorInitialized) {
+ onActorInitialized(message);
+ } else if (message instanceof ClusterEvent.MemberUp){
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ ignoreMessage(message);
+ } else{
+ unknownMessage(message);
+ }
+
+ }
+
+ private void onActorInitialized(Object message) {
+ final ActorRef sender = getSender();
+
+ if (sender == null) {
+ return; //why is a non-actor sending this message? Just ignore.
+ }
+
+ String actorName = sender.path().name();
+ //find shard name from actor name; actor name is stringified shardId
+ ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
+
+ if (shardId.getShardName() == null) {
+ return;
+ }
+ markShardAsInitialized(shardId.getShardName());
+ }
+
+ @VisibleForTesting protected void markShardAsInitialized(String shardName) {
+ LOG.debug("Initializing shard [{}]", shardName);
+ ShardInformation shardInformation = localShards.get(shardName);
+ if (shardInformation != null) {
+ shardInformation.setShardInitialized(true);
+ }
+ }
+
+ @Override protected void handleRecover(Object message) throws Exception {
+
+ if(message instanceof SchemaContextModules){
+ SchemaContextModules msg = (SchemaContextModules) message;
+ knownModules.clear();
+ knownModules.addAll(msg.getModules());
+ } else if(message instanceof RecoveryFailure){
+ RecoveryFailure failure = (RecoveryFailure) message;
+ LOG.error(failure.cause(), "Recovery failed");
+ } else if(message instanceof RecoveryCompleted){
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // Delete all the messages from the akka journal except the last one
+ deleteMessages(lastSequenceNr() - 1);
+ }
+ }
+
+ private void findLocalShard(FindLocalShard message) {
+ ShardInformation shardInformation = localShards.get(message.getShardName());
+
+ if(shardInformation == null){
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ return;
+ }
+
+ sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor()));
+ }
+
+ private void sendResponse(ShardInformation shardInformation, Object message) {
+ if (!shardInformation.isShardInitialized()) {
+ getSender().tell(new ActorNotInitialized(), getSelf());
+ return;
+ }
+
+ getSender().tell(message, getSelf());
+ }
+
+ private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ memberNameToAddress.remove(message.member().roles().head());
+ }
+
+ private void memberUp(ClusterEvent.MemberUp message) {
+ String memberName = message.member().roles().head();
+
+ memberNameToAddress.put(memberName, message.member().address());
+
+ for(ShardInformation info : localShards.values()){
+ String shardName = info.getShardName();
+ info.updatePeerAddress(getShardIdentifier(memberName, shardName),
+ getShardActorPath(shardName, memberName));
+ }
+ }
+
+ /**
+ * Notifies all the local shards of a change in the schema context
+ *
+ * @param message
+ */
+ private void updateSchemaContext(final Object message) {
+ final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+ Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+ Set<String> newModules = new HashSet<>(128);
+
+ for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+ String s = moduleIdentifier.getNamespace().toString();
+ newModules.add(s);
+ }
+
+ if(newModules.containsAll(knownModules)) {
+
+ LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
+
+ knownModules.clear();
+ knownModules.addAll(newModules);
+
+ persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+
+ @Override public void apply(SchemaContextModules param) throws Exception {
+ LOG.info("Sending new SchemaContext to Shards");
+ if (localShards.size() == 0) {
+ createLocalShards(schemaContext);
+ } else {
+ for (ShardInformation info : localShards.values()) {
+ info.getActor().tell(message, getSelf());
+ }
+ }
+ }
+
+ });
+ } else {
+ LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
+ }
+
+ }
+
+ private void findPrimary(FindPrimary message) {
+ final ActorRef sender = getSender();
+ String shardName = message.getShardName();
+
+ // First see if the there is a local replica for the shard
+ ShardInformation info = localShards.get(shardName);
+ if (info != null) {
+ ActorPath shardPath = info.getActorPath();
+ sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable());
+ return;
+ }
+
+ List<String> members = configuration.getMembersFromShardName(shardName);
+
+ if(cluster.getCurrentMemberName() != null) {
+ members.remove(cluster.getCurrentMemberName());
+ }
+
+ /**
+ * FIXME: Instead of sending remote shard actor path back to sender,
+ * forward FindPrimary message to remote shard manager
+ */
+ // There is no way for us to figure out the primary (for now) so assume
+ // that one of the remote nodes is a primary
+ for(String memberName : members) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null){
+ String path =
+ getShardActorPath(shardName, memberName);
+ getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ return;
+ }
+ }
+ getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+ }
+
+ private String getShardActorPath(String shardName, String memberName) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(address.toString())
+ .append("/user/")
+ .append(ShardManagerIdentifier.builder().type(type).build().toString())
+ .append("/")
+ .append(getShardIdentifier(memberName, shardName));
+ return builder.toString();
+ }
+ return null;
+ }
+
+ /**
+ * Construct the name of the shard actor given the name of the member on
+ * which the shard resides and the name of the shard
+ *
+ * @param memberName
+ * @param shardName
+ * @return
+ */
+ private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
+ }
+
+ /**
+ * Create shards that are local to the member on which the ShardManager
+ * runs
+ *
+ */
+ private void createLocalShards(SchemaContext schemaContext) {
+ String memberName = this.cluster.getCurrentMemberName();
List<String> memberShardNames =
- configuration.getMemberShardNames(memberName);
+ this.configuration.getMemberShardNames(memberName);
+ List<String> localShardActorNames = new ArrayList<>();
for(String shardName : memberShardNames){
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type),
- memberName + "-shard-" + shardName + "-" + type);
- ActorPath path = actor.path();
- localShards.put(shardName, path);
+ .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext),
+ shardId.toString());
+ localShardActorNames.add(shardId.toString());
+ localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
}
+
+ mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+ datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
}
- public static Props props(final String type,
- final ClusterWrapper cluster,
- final Configuration configuration) {
- return Props.create(new Creator<ShardManager>() {
+ /**
+ * Given the name of the shard find the addresses of all it's peers
+ *
+ * @param shardName
+ * @return
+ */
+ private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+
+ Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration);
+ List<String> members =
+ this.configuration.getMembersFromShardName(shardName);
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members){
+ if(!currentMemberName.equals(memberName)){
+ ShardIdentifier shardId = getShardIdentifier(memberName,
+ shardName);
+ String path =
+ getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardId, path);
}
- });
+ }
+ return peerAddresses;
}
@Override
- public void handleReceive(Object message) throws Exception {
- if (message instanceof FindPrimary) {
- FindPrimary msg = ((FindPrimary) message);
- String shardName = msg.getShardName();
-
+ public SupervisorStrategy supervisorStrategy() {
- if (Shard.DEFAULT_NAME.equals(shardName)) {
- ActorPath defaultShardPath = localShards.get(shardName);
- if(defaultShardPath == null){
- throw new IllegalStateException("local default shard not found");
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ StringBuilder sb = new StringBuilder();
+ for(StackTraceElement element : t.getStackTrace()) {
+ sb.append("\n\tat ")
+ .append(element.toString());
+ }
+ LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
+ return SupervisorStrategy.resume();
}
- getSender().tell(new PrimaryFound(defaultShardPath.toString()),
- getSelf());
- } else {
- getSender().tell(new PrimaryNotFound(shardName), getSelf());
}
- } else if (message instanceof UpdateSchemaContext) {
- for(ActorPath path : localShards.values()){
- getContext().system().actorSelection(path)
- .forward(message,
- getContext());
+ );
+
+ }
+
+ @Override public String persistenceId() {
+ return "shard-manager-" + type;
+ }
+
+ @VisibleForTesting public Collection<String> getKnownModules() {
+ return knownModules;
+ }
+
+ private class ShardInformation {
+ private final String shardName;
+ private final ActorRef actor;
+ private final ActorPath actorPath;
+ private final Map<ShardIdentifier, String> peerAddresses;
+ private boolean shardInitialized = false; //flag that determines if the actor is ready for business
+
+ private ShardInformation(String shardName, ActorRef actor,
+ Map<ShardIdentifier, String> peerAddresses) {
+ this.shardName = shardName;
+ this.actor = actor;
+ this.actorPath = actor.path();
+ this.peerAddresses = peerAddresses;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public ActorRef getActor(){
+ return actor;
+ }
+
+ public ActorPath getActorPath() {
+ return actorPath;
+ }
+
+ public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId,
+ peerAddress);
+ if(peerAddresses.containsKey(peerId)){
+ peerAddresses.put(peerId, peerAddress);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
+ }
+ actor
+ .tell(new PeerAddressResolved(peerId, peerAddress),
+ getSelf());
+
}
}
+
+ public boolean isShardInitialized() {
+ return shardInitialized;
+ }
+
+ public void setShardInitialized(boolean shardInitialized) {
+ this.shardInitialized = shardInitialized;
+ }
+ }
+
+ private static class ShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+
+ final String type;
+ final ClusterWrapper cluster;
+ final Configuration configuration;
+ final DatastoreContext datastoreContext;
+
+ ShardManagerCreator(String type, ClusterWrapper cluster,
+ Configuration configuration, DatastoreContext datastoreContext) {
+ this.type = type;
+ this.cluster = cluster;
+ this.configuration = configuration;
+ this.datastoreContext = datastoreContext;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(type, cluster, configuration, datastoreContext);
+ }
}
+ static class SchemaContextModules implements Serializable {
+ private final Set<String> modules;
+
+ SchemaContextModules(Set<String> modules){
+ this.modules = modules;
+ }
+ public Set<String> getModules() {
+ return modules;
+ }
+ }
}
+
+
+