- if (message instanceof FindPrimary) {
- FindPrimary msg = ((FindPrimary) message);
- String shardName = msg.getShardName();
-
- List<String> members =
- configuration.getMembersFromShardName(shardName);
-
- for(String memberName : members) {
- if (memberName.equals(cluster.getCurrentMemberName())) {
- // This is a local shard
- ActorPath shardPath = localShards.get(shardName);
- // FIXME: This check may be redundant
- if (shardPath == null) {
- getSender()
- .tell(new PrimaryNotFound(shardName), getSelf());
- return;
- }
- getSender().tell(new PrimaryFound(shardPath.toString()),
- getSelf());
- return;
- } else {
- Address address = memberNameToAddress.get(shardName);
- if(address != null){
- String path =
- address.toString() + "/user/" + getShardActorName(
- memberName, shardName);
- getSender().tell(new PrimaryFound(path), getSelf());
- }
+ if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ findPrimary(
+ FindPrimary.fromSerializable(message));
+
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext(message);
+ } else if (message instanceof ClusterEvent.MemberUp){
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ ignoreMessage(message);
+ } else{
+ throw new Exception ("Not recognized message received, message="+message);
+ }
+
+ }
+
+ private void ignoreMessage(Object message){
+ LOG.debug("Unhandled message : " + message);
+ }
+
+ private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ memberNameToAddress.remove(message.member().roles().head());
+ }
+
+ private void memberUp(ClusterEvent.MemberUp message) {
+ memberNameToAddress.put(message.member().roles().head(), message.member().address());
+ }