+ /**
+ * Create shards that are local to the member on which the ShardManager
+ * runs
+ *
+ */
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ List<String> memberShardNames =
+ this.configuration.getMemberShardNames(memberName);
+
+ List<String> localShardActorNames = new ArrayList<>();
+ for(String shardName : memberShardNames){
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+ ActorRef actor = getContext()
+ .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
+ shardId.toString());
+ localShardActorNames.add(shardId.toString());
+ localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
+ }
+
+ mBean = ShardManagerInfo
+ .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
+
+ }
+
+ /**
+ * Given the name of the shard find the addresses of all it's peers
+ *
+ * @param shardName
+ * @return
+ */
+ private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+
+ Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+
+ List<String> members =
+ this.configuration.getMembersFromShardName(shardName);
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members){
+ if(!currentMemberName.equals(memberName)){
+ ShardIdentifier shardId = getShardIdentifier(memberName,
+ shardName);
+ String path =
+ getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardId, path);
+ }
+ }
+ return peerAddresses;
+ }
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ LOG.warning("Supervisor Strategy of resume applied {}",t);
+ return SupervisorStrategy.resume();
+ }
+ }
+ );
+
+ }
+
+ private class ShardInformation {
+ private final String shardName;
+ private final ActorRef actor;
+ private final ActorPath actorPath;
+ private final Map<ShardIdentifier, String> peerAddresses;
+
+ private ShardInformation(String shardName, ActorRef actor,
+ Map<ShardIdentifier, String> peerAddresses) {
+ this.shardName = shardName;
+ this.actor = actor;
+ this.actorPath = actor.path();
+ this.peerAddresses = peerAddresses;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public ActorRef getActor(){
+ return actor;
+ }
+
+ public ActorPath getActorPath() {
+ return actorPath;
+ }
+
+ public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId,
+ peerAddress);
+ if(peerAddresses.containsKey(peerId)){
+ peerAddresses.put(peerId, peerAddress);
+
+ LOG.debug(
+ "Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
+
+ actor
+ .tell(new PeerAddressResolved(peerId, peerAddress),
+ getSelf());
+
+ }
+ }
+ }