*/
public Object executeRemoteOperation(ActorSelection actor, Object message) {
- LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
+ actor.toString());
Future<Object> future = ask(actor, message, operationTimeout);
actor.tell(message, ActorRef.noSender());
}
+ public void sendShardOperationAsync(String shardName, Object message) {
+ ActorSelection primary = findPrimary(shardName);
+
+ primary.tell(message, ActorRef.noSender());
+ }
+
+
/**
* Execute an operation on the primary for a given shard
* <p>
return clusterWrapper.getCurrentMemberName();
}
+ /**
+ * Send the message to each and every shard
+ *
+ * @param message
+ */
+ public void broadcast(Object message){
+ for(String shardName : configuration.getAllShardNames()){
+ try {
+ sendShardOperationAsync(shardName, message);
+ } catch(Exception e){
+ LOG.warn("broadcast failed to send message " + message.getClass().getSimpleName() + " to shard " + shardName, e);
+ }
+ }
+ }
+
+ public FiniteDuration getOperationDuration() {
+ return operationDuration;
+ }
}