+ // Create the shards that are local to this member
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ List<String> memberShardNames =
+ this.configuration.getMemberShardNames(memberName);
+
+ for(String shardName : memberShardNames){
+ String shardActorName = getShardActorName(memberName, shardName);
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
+ ActorRef actor = getContext()
+ .actorOf(Shard.props(shardActorName, peerAddresses),
+ shardActorName);
+ localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
+ }
+
+ }
+
+ private Map<String, String> getPeerAddresses(String shardName){
+
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ List<String> members =
+ this.configuration.getMembersFromShardName(shardName);
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members){
+ if(!currentMemberName.equals(memberName)){
+ String shardActorName = getShardActorName(memberName, shardName);
+ String path =
+ getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardActorName, path);
+ }
+ }
+ return peerAddresses;
+ }
+
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ return SupervisorStrategy.resume();
+ }
+ }
+ );
+
+ }
+
+ private class ShardInformation {
+ private final String shardName;
+ private final ActorRef actor;
+ private final ActorPath actorPath;
+ private final Map<String, String> peerAddresses;
+
+ private ShardInformation(String shardName, ActorRef actor,
+ Map<String, String> peerAddresses) {
+ this.shardName = shardName;
+ this.actor = actor;
+ this.actorPath = actor.path();
+ this.peerAddresses = peerAddresses;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public ActorRef getActor(){
+ return actor;
+ }
+
+ public ActorPath getActorPath() {
+ return actorPath;
+ }
+
+ public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public void updatePeerAddress(String peerId, String peerAddress){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+ if(peerAddresses.containsKey(peerId)){
+ peerAddresses.put(peerId, peerAddress);
+
+ LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
+
+ actor
+ .tell(new PeerAddressResolved(peerId, peerAddress),
+ getSelf());
+
+ }
+ }
+ }