+ /**
+ * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+ * reply (essentially set and forget).
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ */
+ public void sendOperationAsync(ActorSelection actor, Object message) {
+ Preconditions.checkArgument(actor != null, "actor must not be null");
+ Preconditions.checkArgument(message != null, "message must not be null");
+
+ LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+
+ actor.tell(message, ActorRef.noSender());
+ }
+
+ public void shutdown() {
+ shardManager.tell(PoisonPill.getInstance(), null);
+ actorSystem.shutdown();
+ }
+
+ public String getCurrentMemberName(){
+ return clusterWrapper.getCurrentMemberName();
+ }
+
+ /**
+ * Send the message to each and every shard
+ *
+ * @param message
+ */
+ public void broadcast(Object message){
+ for(String shardName : configuration.getAllShardNames()){
+
+ Optional<ActorSelection> primary = findPrimaryShard(shardName);
+ if (primary.isPresent()) {
+ primary.get().tell(message, ActorRef.noSender());
+ } else {
+ LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
+ message.getClass().getSimpleName(), shardName);
+ }
+ }
+ }
+
+ public FiniteDuration getOperationDuration() {
+ return operationDuration;
+ }