/**
* Send the message to each and every shard
- *
- * @param message
*/
- public void broadcast(final Function<Short, Object> messageSupplier){
+ public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass){
for(final String shardName : configuration.getAllShardNames()){
Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
- Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
if(failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
- message.getClass().getSimpleName(), shardName, failure);
+ messageClass.getSimpleName(), shardName, failure);
} else {
+ Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
}
}