+ /**
+ * Construct the name of the shard actor given the name of the member on
+ * which the shard resides and the name of the shard
+ *
+ * @param memberName
+ * @param shardName
+ * @return
+ */
+ private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
+ }
+
+ /**
+ * Create shards that are local to the member on which the ShardManager
+ * runs
+ *
+ */
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ List<String> memberShardNames =
+ this.configuration.getMemberShardNames(memberName);
+
+ List<String> localShardActorNames = new ArrayList<>();
+ for(String shardName : memberShardNames){
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+ localShardActorNames.add(shardId.toString());
+ localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
+ }
+
+ mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+ datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+ }
+
+ /**
+ * Given the name of the shard find the addresses of all it's peers
+ *
+ * @param shardName
+ * @return
+ */
+ private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+
+ Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+
+ List<String> members =
+ this.configuration.getMembersFromShardName(shardName);
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members){
+ if(!currentMemberName.equals(memberName)){
+ ShardIdentifier shardId = getShardIdentifier(memberName,
+ shardName);
+ String path =
+ getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardId, path);
+ }
+ }
+ return peerAddresses;
+ }
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ StringBuilder sb = new StringBuilder();
+ for(StackTraceElement element : t.getStackTrace()) {
+ sb.append("\n\tat ")
+ .append(element.toString());
+ }
+ LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
+ return SupervisorStrategy.resume();
+ }
+ }
+ );
+
+ }
+
+ @Override
+ public String persistenceId() {
+ return "shard-manager-" + type;
+ }
+
+ @VisibleForTesting
+ Collection<String> getKnownModules() {
+ return knownModules;
+ }
+
+ @VisibleForTesting
+ DataPersistenceProvider getDataPersistenceProvider() {
+ return dataPersistenceProvider;
+ }
+
+ private class ShardInformation {
+ private final ShardIdentifier shardId;
+ private final String shardName;
+ private ActorRef actor;
+ private ActorPath actorPath;
+ private final Map<ShardIdentifier, String> peerAddresses;
+
+ // flag that determines if the actor is ready for business
+ private boolean actorInitialized = false;
+
+ private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+
+ private ShardInformation(String shardName, ShardIdentifier shardId,
+ Map<ShardIdentifier, String> peerAddresses) {
+ this.shardName = shardName;
+ this.shardId = shardId;
+ this.peerAddresses = peerAddresses;
+ }
+
+ String getShardName() {
+ return shardName;
+ }
+
+ ActorRef getActor(){
+ return actor;
+ }
+
+ ActorPath getActorPath() {
+ return actorPath;
+ }
+
+ void setActor(ActorRef actor) {
+ this.actor = actor;
+ this.actorPath = actor.path();
+ }
+
+ ShardIdentifier getShardId() {
+ return shardId;
+ }
+
+ Map<ShardIdentifier, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId,
+ peerAddress);
+ if(peerAddresses.containsKey(peerId)){
+ peerAddresses.put(peerId, peerAddress);
+
+ if(actor != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
+ }
+
+ actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+ }