- if (message instanceof FindPrimary) {
- FindPrimary msg = ((FindPrimary) message);
- String shardName = msg.getShardName();
-
- List<String> members =
- configuration.getMembersFromShardName(shardName);
-
- for(String memberName : members) {
- if (memberName.equals(cluster.getCurrentMemberName())) {
- // This is a local shard
- ActorPath shardPath = localShards.get(shardName);
- // FIXME: This check may be redundant
- if (shardPath == null) {
- getSender()
- .tell(new PrimaryNotFound(shardName), getSelf());
- return;
- }
- getSender().tell(new PrimaryFound(shardPath.toString()),
+ if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ findPrimary(
+ FindPrimary.fromSerializable(message));
+ } else if(message instanceof FindLocalShard){
+ findLocalShard((FindLocalShard) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext(message);
+ } else if (message instanceof ClusterEvent.MemberUp){
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ ignoreMessage(message);
+ } else{
+ unknownMessage(message);
+ }
+
+ }
+
+ private void findLocalShard(FindLocalShard message) {
+ ShardInformation shardInformation =
+ localShards.get(message.getShardName());
+
+ if(shardInformation != null){
+ getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+ return;
+ }
+
+ getSender().tell(new LocalShardNotFound(message.getShardName()),
+ getSelf());
+ }
+
+ private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ memberNameToAddress.remove(message.member().roles().head());
+ }
+
+ private void memberUp(ClusterEvent.MemberUp message) {
+ String memberName = message.member().roles().head();
+
+ memberNameToAddress.put(memberName , message.member().address());
+
+ for(ShardInformation info : localShards.values()){
+ String shardName = info.getShardName();
+ info.updatePeerAddress(getShardIdentifier(memberName, shardName),
+ getShardActorPath(shardName, memberName));
+ }
+ }
+
+ /**
+ * Notifies all the local shards of a change in the schema context
+ *
+ * @param message
+ */
+ private void updateSchemaContext(Object message) {
+ for(ShardInformation info : localShards.values()){
+ info.getActor().tell(message,getSelf());
+ }
+ }
+
+ private void findPrimary(FindPrimary message) {
+ String shardName = message.getShardName();
+
+ // First see if the there is a local replica for the shard
+ ShardInformation info = localShards.get(shardName);
+ if(info != null) {
+ ActorPath shardPath = info.getActorPath();
+ if (shardPath != null) {
+ getSender()
+ .tell(
+ new PrimaryFound(shardPath.toString()).toSerializable(),