X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=0363b3ceb370772ca617bb50abcddef95ed7da5e;hb=516a4b2ea78179c9bd6ebb584862e8fc686ebf08;hp=3e0c97f6afb8e33d1c9b580f4a96111d2a3b6884;hpb=de3e413b633b7555ae8f3fe2ec163dbb7dda5da8;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 3e0c97f6af..0363b3ceb3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -11,14 +11,18 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; +import akka.actor.OneForOneStrategy; import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; +import akka.actor.SupervisorStrategy; +import akka.cluster.ClusterEvent; import akka.japi.Creator; +import akka.japi.Function; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import scala.concurrent.duration.Duration; import java.util.HashMap; import java.util.List; @@ -49,21 +53,9 @@ import java.util.Map; */ public class ShardManager extends AbstractUntypedActor { - // Stores a mapping between a shard name and the address of the current primary - private final Map shardNameToPrimaryAddress = - new HashMap<>(); - // Stores a mapping between a member name and the address of the member private final Map memberNameToAddress = new HashMap<>(); - // Stores a mapping between the shard name and all the members on which a replica of that shard are available - private final Map> shardNameToMembers = - new HashMap<>(); - - private final LoggingAdapter log = - Logging.getLogger(getContext().system(), this); - - private final Map localShards = new HashMap<>(); @@ -78,20 +70,17 @@ public class ShardManager extends AbstractUntypedActor { * configuration or operational */ private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) { - this.type = type; - this.cluster = cluster; - this.configuration = configuration; - String memberName = cluster.getCurrentMemberName(); - List memberShardNames = - configuration.getMemberShardNames(memberName); - for(String shardName : memberShardNames){ - String shardActorName = getShardActorName(memberName, shardName); - ActorRef actor = getContext() - .actorOf(Shard.props(shardActorName), shardActorName); - ActorPath path = actor.path(); - localShards.put(shardName, path); - } + this.type = Preconditions.checkNotNull(type, "type should not be null"); + this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); + this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); + + // Subscribe this actor to cluster member events + cluster.subscribeToMemberEvents(getSelf()); + + // Create all the local Shards and make them a child of the ShardManager + // TODO: This may need to be initiated when we first get the schema context + createLocalShards(); } public static Props props(final String type, @@ -106,55 +95,113 @@ public class ShardManager extends AbstractUntypedActor { }); } + @Override public void handleReceive(Object message) throws Exception { - if (message instanceof FindPrimary) { - FindPrimary msg = ((FindPrimary) message); - String shardName = msg.getShardName(); - - List members = - configuration.getMembersFromShardName(shardName); - - for(String memberName : members) { - if (memberName.equals(cluster.getCurrentMemberName())) { - // This is a local shard - ActorPath shardPath = localShards.get(shardName); - // FIXME: This check may be redundant - if (shardPath == null) { - getSender() - .tell(new PrimaryNotFound(shardName), getSelf()); - return; - } - getSender().tell(new PrimaryFound(shardPath.toString()), - getSelf()); - return; - } else { - Address address = memberNameToAddress.get(shardName); - if(address != null){ - String path = - address.toString() + "/user/" + getShardActorName( - memberName, shardName); - getSender().tell(new PrimaryFound(path), getSelf()); - } + if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { + findPrimary( + FindPrimary.fromSerializable(message)); + + } else if (message instanceof UpdateSchemaContext) { + updateSchemaContext(message); + } else if (message instanceof ClusterEvent.MemberUp){ + memberUp((ClusterEvent.MemberUp) message); + } else if(message instanceof ClusterEvent.MemberRemoved) { + memberRemoved((ClusterEvent.MemberRemoved) message); + } else if(message instanceof ClusterEvent.UnreachableMember) { + ignoreMessage(message); + } else{ + throw new Exception ("Not recognized message received, message="+message); + } + + } + + private void ignoreMessage(Object message){ + LOG.debug("Unhandled message : " + message); + } + + private void memberRemoved(ClusterEvent.MemberRemoved message) { + memberNameToAddress.remove(message.member().roles().head()); + } + + private void memberUp(ClusterEvent.MemberUp message) { + memberNameToAddress.put(message.member().roles().head(), message.member().address()); + } + private void updateSchemaContext(Object message) { + for(ActorPath path : localShards.values()){ + getContext().system().actorSelection(path) + .forward(message, + getContext()); + } + } + + private void findPrimary(FindPrimary message) { + String shardName = message.getShardName(); + + List members = + configuration.getMembersFromShardName(shardName); + for(String memberName : members) { + if (memberName.equals(cluster.getCurrentMemberName())) { + // This is a local shard + ActorPath shardPath = localShards.get(shardName); + if (shardPath == null) { + getSender() + .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); + return; + } + getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(), + getSelf()); + return; + } else { + Address address = memberNameToAddress.get(memberName); + if(address != null){ + String path = + address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName( + memberName, shardName); + getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); + return; } - } - getSender().tell(new PrimaryNotFound(shardName), getSelf()); - } else if (message instanceof UpdateSchemaContext) { - for(ActorPath path : localShards.values()){ - getContext().system().actorSelection(path) - .forward(message, - getContext()); } } + + getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); } private String getShardActorName(String memberName, String shardName){ return memberName + "-shard-" + shardName + "-" + this.type; } + // Create the shards that are local to this member + private void createLocalShards() { + String memberName = this.cluster.getCurrentMemberName(); + List memberShardNames = + this.configuration.getMemberShardNames(memberName); + for(String shardName : memberShardNames){ + String shardActorName = getShardActorName(memberName, shardName); + ActorRef actor = getContext() + .actorOf(Shard.props(shardActorName), shardActorName); + ActorPath path = actor.path(); + localShards.put(shardName, path); + } + + } + + + @Override + public SupervisorStrategy supervisorStrategy() { + return new OneForOneStrategy(10, Duration.create("1 minute"), + new Function() { + @Override + public SupervisorStrategy.Directive apply(Throwable t) { + return SupervisorStrategy.resume(); + } + } + ); + + } }