X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=81236521d649a48902de103c52888488559112ca;hb=1e59825dbec7b354d76bd7efa6a61e4ad802c802;hp=4e2369d3758596bd1217670f8f3ec5a2438db36d;hpb=81aa5072801e6453306e296b91dba3dbeeaf046d;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 4e2369d375..81236521d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -12,7 +12,6 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; @@ -48,56 +47,84 @@ import java.util.Map; *
  • When a local shard replica comes alive *

    */ -public class ShardManager extends UntypedActor { - - // Stores a mapping between a shard name and the address of the current primary - private final Map shardNameToPrimaryAddress = new HashMap<>(); - - // Stores a mapping between a member name and the address of the member - private final Map memberNameToAddress = new HashMap<>(); - - // Stores a mapping between the shard name and all the members on which a replica of that shard are available - private final Map> shardNameToMembers = new HashMap<>(); - - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - private final ActorPath defaultShardPath; - - /** - * - * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be - * configuration or operational - */ - private ShardManager(String type){ - ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type)); - defaultShardPath = actor.path(); - } - - public static Props props(final String type){ - return Props.create(new Creator(){ - - @Override - public ShardManager create() throws Exception { - return new ShardManager(type); - } - }); - } - - @Override - public void onReceive(Object message) throws Exception { - if (message instanceof FindPrimary) { - FindPrimary msg = ((FindPrimary) message); - String shardName = msg.getShardName(); - if(Shard.DEFAULT_NAME.equals(shardName)){ - getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf()); - } else { - getSender().tell(new PrimaryNotFound(shardName), getSelf()); - } - } else if(message instanceof UpdateSchemaContext){ - // FIXME : Notify all local shards of a context change - getContext().system().actorSelection(defaultShardPath).forward(message, getContext()); +public class ShardManager extends AbstractUntypedActor { + + // Stores a mapping between a shard name and the address of the current primary + private final Map shardNameToPrimaryAddress = + new HashMap<>(); + + // Stores a mapping between a member name and the address of the member + private final Map memberNameToAddress = new HashMap<>(); + + // Stores a mapping between the shard name and all the members on which a replica of that shard are available + private final Map> shardNameToMembers = + new HashMap<>(); + + private final LoggingAdapter log = + Logging.getLogger(getContext().system(), this); + + + private final Map localShards = new HashMap<>(); + + + private final ClusterWrapper cluster; + + /** + * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be + * configuration or operational + */ + private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) { + this.cluster = cluster; + String memberName = cluster.getCurrentMemberName(); + List memberShardNames = + configuration.getMemberShardNames(memberName); + + for(String shardName : memberShardNames){ + ActorRef actor = getContext() + .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type), + memberName + "-shard-" + shardName + "-" + type); + ActorPath path = actor.path(); + localShards.put(shardName, path); + } + } + + public static Props props(final String type, + final ClusterWrapper cluster, + final Configuration configuration) { + return Props.create(new Creator() { + + @Override + public ShardManager create() throws Exception { + return new ShardManager(type, cluster, configuration); + } + }); + } + + @Override + public void handleReceive(Object message) throws Exception { + if (message instanceof FindPrimary) { + FindPrimary msg = ((FindPrimary) message); + String shardName = msg.getShardName(); + + + if (Shard.DEFAULT_NAME.equals(shardName)) { + ActorPath defaultShardPath = localShards.get(shardName); + if(defaultShardPath == null){ + throw new IllegalStateException("local default shard not found"); + } + getSender().tell(new PrimaryFound(defaultShardPath.toString()), + getSelf()); + } else { + getSender().tell(new PrimaryNotFound(shardName), getSelf()); + } + } else if (message instanceof UpdateSchemaContext) { + for(ActorPath path : localShards.values()){ + getContext().system().actorSelection(path) + .forward(message, + getContext()); + } + } } - } }