-public class ShardManager extends UntypedActor {
-
- // Stores a mapping between a shard name and the address of the current primary
- private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
-
- // Stores a mapping between a member name and the address of the member
- private final Map<String, Address> memberNameToAddress = new HashMap<>();
-
- // Stores a mapping between the shard name and all the members on which a replica of that shard are available
- private final Map<String, List<String>> shardNameToMembers = new HashMap<>();
-
- private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
- private final ActorPath defaultShardPath;
-
- /**
- *
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
- */
- private ShardManager(String type){
- ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type));
- defaultShardPath = actor.path();
- }
-
- public static Props props(final String type){
- return Props.create(new Creator<ShardManager>(){
-
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(type);
- }
- });
- }
-
- @Override
- public void onReceive(Object message) throws Exception {
- if (message instanceof FindPrimary) {
- FindPrimary msg = ((FindPrimary) message);
- String shardName = msg.getShardName();
- if(Shard.DEFAULT_NAME.equals(shardName)){
- getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf());
- } else {
- getSender().tell(new PrimaryNotFound(shardName), getSelf());
- }
- } else if(message instanceof UpdateSchemaContext){
- // FIXME : Notify all local shards of a context change
- getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
+public class ShardManager extends AbstractUntypedActor {
+
+ // Stores a mapping between a shard name and the address of the current primary
+ private final Map<String, Address> shardNameToPrimaryAddress =
+ new HashMap<>();
+
+ // Stores a mapping between a member name and the address of the member
+ private final Map<String, Address> memberNameToAddress = new HashMap<>();
+
+ // Stores a mapping between the shard name and all the members on which a replica of that shard are available
+ private final Map<String, List<String>> shardNameToMembers =
+ new HashMap<>();
+
+ private final LoggingAdapter log =
+ Logging.getLogger(getContext().system(), this);
+
+
+ private final Map<String, ActorPath> localShards = new HashMap<>();
+
+
+ private final ClusterWrapper cluster;
+
+ /**
+ * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
+ * configuration or operational
+ */
+ private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+ this.cluster = cluster;
+ String memberName = cluster.getCurrentMemberName();
+ List<String> memberShardNames =
+ configuration.getMemberShardNames(memberName);
+
+ for(String shardName : memberShardNames){
+ ActorRef actor = getContext()
+ .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type),
+ memberName + "-shard-" + shardName + "-" + type);
+ ActorPath path = actor.path();
+ localShards.put(shardName, path);
+ }
+ }
+
+ public static Props props(final String type,
+ final ClusterWrapper cluster,
+ final Configuration configuration) {
+ return Props.create(new Creator<ShardManager>() {
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(type, cluster, configuration);
+ }
+ });
+ }
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message instanceof FindPrimary) {
+ FindPrimary msg = ((FindPrimary) message);
+ String shardName = msg.getShardName();
+
+
+ if (Shard.DEFAULT_NAME.equals(shardName)) {
+ ActorPath defaultShardPath = localShards.get(shardName);
+ if(defaultShardPath == null){
+ throw new IllegalStateException("local default shard not found");
+ }
+ getSender().tell(new PrimaryFound(defaultShardPath.toString()),
+ getSelf());
+ } else {
+ getSender().tell(new PrimaryNotFound(shardName), getSelf());
+ }
+ } else if (message instanceof UpdateSchemaContext) {
+ for(ActorPath path : localShards.values()){
+ getContext().system().actorSelection(path)
+ .forward(message,
+ getContext());
+ }
+ }