import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
+import akka.actor.OneForOneStrategy;
import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.SupervisorStrategy;
+import akka.cluster.ClusterEvent;
import akka.japi.Creator;
+import akka.japi.Function;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import scala.concurrent.duration.Duration;
import java.util.HashMap;
import java.util.List;
*/
public class ShardManager extends AbstractUntypedActor {
- // Stores a mapping between a shard name and the address of the current primary
- private final Map<String, Address> shardNameToPrimaryAddress =
- new HashMap<>();
-
// Stores a mapping between a member name and the address of the member
private final Map<String, Address> memberNameToAddress = new HashMap<>();
- // Stores a mapping between the shard name and all the members on which a replica of that shard are available
- private final Map<String, List<String>> shardNameToMembers =
- new HashMap<>();
-
- private final LoggingAdapter log =
- Logging.getLogger(getContext().system(), this);
-
-
private final Map<String, ActorPath> localShards = new HashMap<>();
* configuration or operational
*/
private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
- this.type = type;
- this.cluster = cluster;
- this.configuration = configuration;
- String memberName = cluster.getCurrentMemberName();
- List<String> memberShardNames =
- configuration.getMemberShardNames(memberName);
- for(String shardName : memberShardNames){
- String shardActorName = getShardActorName(memberName, shardName);
- ActorRef actor = getContext()
- .actorOf(Shard.props(shardActorName), shardActorName);
- ActorPath path = actor.path();
- localShards.put(shardName, path);
- }
+ this.type = Preconditions.checkNotNull(type, "type should not be null");
+ this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
+ this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+ // Subscribe this actor to cluster member events
+ cluster.subscribeToMemberEvents(getSelf());
+
+ // Create all the local Shards and make them a child of the ShardManager
+ // TODO: This may need to be initiated when we first get the schema context
+ createLocalShards();
}
public static Props props(final String type,
});
}
+
@Override
public void handleReceive(Object message) throws Exception {
- if (message instanceof FindPrimary) {
- FindPrimary msg = ((FindPrimary) message);
- String shardName = msg.getShardName();
-
- List<String> members =
- configuration.getMembersFromShardName(shardName);
-
- for(String memberName : members) {
- if (memberName.equals(cluster.getCurrentMemberName())) {
- // This is a local shard
- ActorPath shardPath = localShards.get(shardName);
- // FIXME: This check may be redundant
- if (shardPath == null) {
- getSender()
- .tell(new PrimaryNotFound(shardName), getSelf());
- return;
- }
- getSender().tell(new PrimaryFound(shardPath.toString()),
- getSelf());
- return;
- } else {
- Address address = memberNameToAddress.get(shardName);
- if(address != null){
- String path =
- address.toString() + "/user/" + getShardActorName(
- memberName, shardName);
- getSender().tell(new PrimaryFound(path), getSelf());
- }
+ if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ findPrimary(
+ FindPrimary.fromSerializable(message));
+
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext(message);
+ } else if (message instanceof ClusterEvent.MemberUp){
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ ignoreMessage(message);
+ } else{
+ throw new Exception ("Not recognized message received, message="+message);
+ }
+
+ }
+
+ private void ignoreMessage(Object message){
+ LOG.debug("Unhandled message : " + message);
+ }
+
+ private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ memberNameToAddress.remove(message.member().roles().head());
+ }
+
+ private void memberUp(ClusterEvent.MemberUp message) {
+ memberNameToAddress.put(message.member().roles().head(), message.member().address());
+ }
+ private void updateSchemaContext(Object message) {
+ for(ActorPath path : localShards.values()){
+ getContext().system().actorSelection(path)
+ .forward(message,
+ getContext());
+ }
+ }
+
+ private void findPrimary(FindPrimary message) {
+ String shardName = message.getShardName();
+
+ List<String> members =
+ configuration.getMembersFromShardName(shardName);
+ for(String memberName : members) {
+ if (memberName.equals(cluster.getCurrentMemberName())) {
+ // This is a local shard
+ ActorPath shardPath = localShards.get(shardName);
+ if (shardPath == null) {
+ getSender()
+ .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+ return;
+ }
+ getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(),
+ getSelf());
+ return;
+ } else {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null){
+ String path =
+ address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName(
+ memberName, shardName);
+ getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ return;
}
- }
- getSender().tell(new PrimaryNotFound(shardName), getSelf());
- } else if (message instanceof UpdateSchemaContext) {
- for(ActorPath path : localShards.values()){
- getContext().system().actorSelection(path)
- .forward(message,
- getContext());
}
}
+
+ getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
}
private String getShardActorName(String memberName, String shardName){
return memberName + "-shard-" + shardName + "-" + this.type;
}
+ // Create the shards that are local to this member
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ List<String> memberShardNames =
+ this.configuration.getMemberShardNames(memberName);
+ for(String shardName : memberShardNames){
+ String shardActorName = getShardActorName(memberName, shardName);
+ ActorRef actor = getContext()
+ .actorOf(Shard.props(shardActorName), shardActorName);
+ ActorPath path = actor.path();
+ localShards.put(shardName, path);
+ }
+
+ }
+
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ return SupervisorStrategy.resume();
+ }
+ }
+ );
+
+ }
}