X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=81236521d649a48902de103c52888488559112ca;hp=f55774f09104f5fcf3d0623583dab81fb74ab9d8;hb=af255d4824ce1290d6e6b4c669a5d9b0c5960f34;hpb=321d959d82bf14a6826e5bf42419c1a1460a1cbc diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index f55774f091..81236521d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -49,54 +49,82 @@ import java.util.Map; */ public class ShardManager extends AbstractUntypedActor { - // Stores a mapping between a shard name and the address of the current primary - private final Map shardNameToPrimaryAddress = new HashMap<>(); - - // Stores a mapping between a member name and the address of the member - private final Map memberNameToAddress = new HashMap<>(); - - // Stores a mapping between the shard name and all the members on which a replica of that shard are available - private final Map> shardNameToMembers = new HashMap<>(); - - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - private final ActorPath defaultShardPath; - - /** - * - * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be - * configuration or operational - */ - private ShardManager(String type){ - ActorRef actor = getContext().actorOf(Shard.props("shard-" + Shard.DEFAULT_NAME + "-" + type), "shard-" + Shard.DEFAULT_NAME + "-" + type); - defaultShardPath = actor.path(); - } - - public static Props props(final String type){ - return Props.create(new Creator(){ - - @Override - public ShardManager create() throws Exception { - return new ShardManager(type); - } - }); - } - - @Override - public void handleReceive(Object message) throws Exception { - if (message instanceof FindPrimary) { - FindPrimary msg = ((FindPrimary) message); - String shardName = msg.getShardName(); - if(Shard.DEFAULT_NAME.equals(shardName)){ - getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf()); - } else { - getSender().tell(new PrimaryNotFound(shardName), getSelf()); - } - } else if(message instanceof UpdateSchemaContext){ - // FIXME : Notify all local shards of a schemaContext change - getContext().system().actorSelection(defaultShardPath).forward(message, getContext()); + // Stores a mapping between a shard name and the address of the current primary + private final Map shardNameToPrimaryAddress = + new HashMap<>(); + + // Stores a mapping between a member name and the address of the member + private final Map memberNameToAddress = new HashMap<>(); + + // Stores a mapping between the shard name and all the members on which a replica of that shard are available + private final Map> shardNameToMembers = + new HashMap<>(); + + private final LoggingAdapter log = + Logging.getLogger(getContext().system(), this); + + + private final Map localShards = new HashMap<>(); + + + private final ClusterWrapper cluster; + + /** + * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be + * configuration or operational + */ + private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) { + this.cluster = cluster; + String memberName = cluster.getCurrentMemberName(); + List memberShardNames = + configuration.getMemberShardNames(memberName); + + for(String shardName : memberShardNames){ + ActorRef actor = getContext() + .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type), + memberName + "-shard-" + shardName + "-" + type); + ActorPath path = actor.path(); + localShards.put(shardName, path); + } + } + + public static Props props(final String type, + final ClusterWrapper cluster, + final Configuration configuration) { + return Props.create(new Creator() { + + @Override + public ShardManager create() throws Exception { + return new ShardManager(type, cluster, configuration); + } + }); + } + + @Override + public void handleReceive(Object message) throws Exception { + if (message instanceof FindPrimary) { + FindPrimary msg = ((FindPrimary) message); + String shardName = msg.getShardName(); + + + if (Shard.DEFAULT_NAME.equals(shardName)) { + ActorPath defaultShardPath = localShards.get(shardName); + if(defaultShardPath == null){ + throw new IllegalStateException("local default shard not found"); + } + getSender().tell(new PrimaryFound(defaultShardPath.toString()), + getSelf()); + } else { + getSender().tell(new PrimaryNotFound(shardName), getSelf()); + } + } else if (message instanceof UpdateSchemaContext) { + for(ActorPath path : localShards.values()){ + getContext().system().actorSelection(path) + .forward(message, + getContext()); + } + } } - } }