+ private void memberUp(ClusterEvent.MemberUp message) {
+ memberNameToAddress.put(message.member().roles().head(), message.member().address());
+ }
+
+ private void updateSchemaContext(Object message) {
+ for(ActorPath path : localShards.values()){
+ getContext().system().actorSelection(path)
+ .forward(message,
+ getContext());
+ }
+ }
+
+ private void findPrimary(FindPrimary message) {
+ String shardName = message.getShardName();
+
+ List<String> members =
+ configuration.getMembersFromShardName(shardName);
+
+ for(String memberName : members) {
+ if (memberName.equals(cluster.getCurrentMemberName())) {
+ // This is a local shard
+ ActorPath shardPath = localShards.get(shardName);
+ if (shardPath == null) {
+ getSender()
+ .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+ return;
+ }
+ getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(),
+ getSelf());
+ return;
+ } else {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null){
+ String path =
+ address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName(
+ memberName, shardName);
+ getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ return;
+ }
+
+
+ }
+ }
+
+ getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+ }
+
+ private String getShardActorName(String memberName, String shardName){
+ return memberName + "-shard-" + shardName + "-" + this.type;
+ }
+
+ // Create the shards that are local to this member
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ List<String> memberShardNames =
+ this.configuration.getMemberShardNames(memberName);
+
+ for(String shardName : memberShardNames){
+ String shardActorName = getShardActorName(memberName, shardName);
+ ActorRef actor = getContext()
+ .actorOf(Shard.props(shardActorName), shardActorName);
+ ActorPath path = actor.path();
+ localShards.put(shardName, path);
+ }
+
+ }
+
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ return SupervisorStrategy.resume();
+ }
+ }
+ );
+
+ }