+ /**
+ * Send the message to each and every shard
+ *
+ * @param message
+ */
+ public void broadcast(Object message){
+ for(String shardName : configuration.getAllShardNames()){
+
+ Optional<ActorSelection> primary = findPrimaryShard(shardName);
+ if (primary.isPresent()) {
+ primary.get().tell(message, ActorRef.noSender());
+ } else {
+ LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
+ message.getClass().getSimpleName(), shardName);
+ }
+ }
+ }
+
+ public FiniteDuration getOperationDuration() {
+ return operationDuration;
+ }
+
+ public boolean isLocalPath(String path) {
+ String selfAddress = clusterWrapper.getSelfAddress();
+ if (path == null || selfAddress == null) {
+ return false;
+ }
+
+ int atIndex1 = path.indexOf("@");
+ int atIndex2 = selfAddress.indexOf("@");
+
+ if (atIndex1 == -1 || atIndex2 == -1) {
+ return false;
+ }
+
+ int slashIndex1 = path.indexOf("/", atIndex1);
+ int slashIndex2 = selfAddress.indexOf("/", atIndex2);
+
+ if (slashIndex1 == -1 || slashIndex2 == -1) {
+ return false;
+ }
+
+ String hostPort1 = path.substring(atIndex1, slashIndex1);
+ String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
+
+ return hostPort1.equals(hostPort2);
+ }