- public void broadcast(Object message){
- for(String shardName : configuration.getAllShardNames()){
-
- Optional<ActorSelection> primary = findPrimaryShard(shardName);
- if (primary.isPresent()) {
- primary.get().tell(message, ActorRef.noSender());
- } else {
- LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
- message.getClass().getSimpleName(), shardName);
- }
+ public void broadcast(final Object message){
+ for(final String shardName : configuration.getAllShardNames()){
+
+ Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
+ @Override
+ public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
+ if(failure != null) {
+ LOG.warn("broadcast failed to send message {} to shard {}: {}",
+ message.getClass().getSimpleName(), shardName, failure);
+ } else {
+ primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
+ }
+ }
+ }, getClientDispatcher());