- // Stores a mapping between a shard name and the address of the current primary
- private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
-
- // Stores a mapping between a member name and the address of the member
- private final Map<String, Address> memberNameToAddress = new HashMap<>();
-
- // Stores a mapping between the shard name and all the members on which a replica of that shard are available
- private final Map<String, List<String>> shardNameToMembers = new HashMap<>();
-
- private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
- private final ActorPath defaultShardPath;
-
- /**
- *
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
- */
- private ShardManager(String type){
- ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type));
- defaultShardPath = actor.path();
- }
-
- public static Props props(final String type){
- return Props.create(new Creator<ShardManager>(){
-
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(type);
- }
- });
- }
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if (message instanceof FindPrimary) {
- FindPrimary msg = ((FindPrimary) message);
- String shardName = msg.getShardName();
- if(Shard.DEFAULT_NAME.equals(shardName)){
- getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf());
- } else {
- getSender().tell(new PrimaryNotFound(shardName), getSelf());
- }
- } else if(message instanceof UpdateSchemaContext){
- // FIXME : Notify all local shards of a context change
- getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
+ // Stores a mapping between a member name and the address of the member
+ private final Map<String, Address> memberNameToAddress = new HashMap<>();
+
+ private final Map<String, ShardInformation> localShards = new HashMap<>();
+
+
+ private final String type;
+
+ private final ClusterWrapper cluster;
+
+ private final Configuration configuration;
+
+ /**
+ * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
+ * configuration or operational
+ */
+ private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+
+ this.type = Preconditions.checkNotNull(type, "type should not be null");
+ this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
+ this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+ // Subscribe this actor to cluster member events
+ cluster.subscribeToMemberEvents(getSelf());
+
+ // Create all the local Shards and make them a child of the ShardManager
+ // TODO: This may need to be initiated when we first get the schema context
+ createLocalShards();
+ }
+
+ public static Props props(final String type,
+ final ClusterWrapper cluster,
+ final Configuration configuration) {
+ return Props.create(new Creator<ShardManager>() {
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(type, cluster, configuration);
+ }
+ });
+ }
+
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ findPrimary(
+ FindPrimary.fromSerializable(message));
+
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext(message);
+ } else if (message instanceof ClusterEvent.MemberUp){
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ ignoreMessage(message);
+ } else{
+ throw new Exception ("Not recognized message received, message="+message);
+ }
+
+ }
+
+ private void ignoreMessage(Object message){
+ LOG.debug("Unhandled message : " + message);
+ }
+
+ private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ memberNameToAddress.remove(message.member().roles().head());
+ }
+
+ private void memberUp(ClusterEvent.MemberUp message) {
+ String memberName = message.member().roles().head();
+
+ memberNameToAddress.put(memberName , message.member().address());
+
+ for(ShardInformation info : localShards.values()){
+ String shardName = info.getShardName();
+ info.updatePeerAddress(getShardActorName(memberName, shardName),
+ getShardActorPath(shardName, memberName));
+ }
+ }
+
+ private void updateSchemaContext(Object message) {
+ for(ShardInformation info : localShards.values()){
+ info.getActor().tell(message,getSelf());
+ }
+ }
+
+ private void findPrimary(FindPrimary message) {
+ String shardName = message.getShardName();
+
+ List<String> members =
+ configuration.getMembersFromShardName(shardName);
+
+ // First see if the there is a local replica for the shard
+ ShardInformation info = localShards.get(shardName);
+ if(info != null) {
+ ActorPath shardPath = info.getActorPath();
+ if (shardPath != null) {
+ getSender()
+ .tell(
+ new PrimaryFound(shardPath.toString()).toSerializable(),
+ getSelf());
+ return;
+ }
+ }
+
+ if(cluster.getCurrentMemberName() != null) {
+ members.remove(cluster.getCurrentMemberName());
+ }
+
+ // There is no way for us to figure out the primary (for now) so assume
+ // that one of the remote nodes is a primary
+ for(String memberName : members) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null){
+ String path =
+ getShardActorPath(shardName, memberName);
+ getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ return;
+ }
+ }
+ getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+ }
+
+ private String
+
+
+ getShardActorPath(String shardName, String memberName) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null) {
+ return address.toString() + "/user/shardmanager-" + this.type + "/"
+ + getShardActorName(
+ memberName, shardName);
+ }
+ return null;
+ }
+
+ private String getShardActorName(String memberName, String shardName){
+ return memberName + "-shard-" + shardName + "-" + this.type;