+ // Create the shards that are local to this member
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ List<String> memberShardNames =
+ this.configuration.getMemberShardNames(memberName);
+
+ for(String shardName : memberShardNames){
+ String shardActorName = getShardActorName(memberName, shardName);
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
+ ActorRef actor = getContext()
+ .actorOf(Shard.props(shardActorName, peerAddresses),
+ shardActorName);
+ localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
+ }
+
+ }
+
+ private Map<String, String> getPeerAddresses(String shardName){
+
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ List<String> members =
+ this.configuration.getMembersFromShardName(shardName);
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members){
+ if(!currentMemberName.equals(memberName)){
+ String shardActorName = getShardActorName(memberName, shardName);
+ String path =
+ getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardActorName, path);
+ }
+ }
+ return peerAddresses;
+ }
+
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ return SupervisorStrategy.resume();
+ }
+ }
+ );