private final Map<String, ActorPath> localShards = new HashMap<>();
+ private final String type;
+
private final ClusterWrapper cluster;
+ private final Configuration configuration;
+
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
*/
private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+ this.type = type;
this.cluster = cluster;
+ this.configuration = configuration;
String memberName = cluster.getCurrentMemberName();
List<String> memberShardNames =
configuration.getMemberShardNames(memberName);
for(String shardName : memberShardNames){
+ String shardActorName = getShardActorName(memberName, shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type),
- memberName + "-shard-" + shardName + "-" + type);
+ .actorOf(Shard.props(shardActorName), shardActorName);
ActorPath path = actor.path();
localShards.put(shardName, path);
}
FindPrimary msg = ((FindPrimary) message);
String shardName = msg.getShardName();
- if (Shard.DEFAULT_NAME.equals(shardName)) {
- ActorPath defaultShardPath = localShards.get(shardName);
- if(defaultShardPath == null){
- throw new IllegalStateException("local default shard not found");
+ List<String> members =
+ configuration.getMembersFromShardName(shardName);
+
+ for(String memberName : members) {
+ if (memberName.equals(cluster.getCurrentMemberName())) {
+ // This is a local shard
+ ActorPath shardPath = localShards.get(shardName);
+ // FIXME: This check may be redundant
+ if (shardPath == null) {
+ getSender()
+ .tell(new PrimaryNotFound(shardName), getSelf());
+ return;
+ }
+ getSender().tell(new PrimaryFound(shardPath.toString()),
+ getSelf());
+ return;
+ } else {
+ Address address = memberNameToAddress.get(shardName);
+ if(address != null){
+ String path =
+ address.toString() + "/user/" + getShardActorName(
+ memberName, shardName);
+ getSender().tell(new PrimaryFound(path), getSelf());
+ }
+
+
}
- getSender().tell(new PrimaryFound(defaultShardPath.toString()),
- getSelf());
- } else {
- getSender().tell(new PrimaryNotFound(shardName), getSelf());
}
+
+ getSender().tell(new PrimaryNotFound(shardName), getSelf());
+
} else if (message instanceof UpdateSchemaContext) {
for(ActorPath path : localShards.values()){
getContext().system().actorSelection(path)
}
}
+ private String getShardActorName(String memberName, String shardName){
+ return memberName + "-shard-" + shardName + "-" + this.type;
+ }
+
}