+ */
+ public void broadcast(final Object message){
+ for(final String shardName : configuration.getAllShardNames()){
+
+ Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ if(failure != null) {
+ LOG.warn("broadcast failed to send message {} to shard {}: {}",
+ message.getClass().getSimpleName(), shardName, failure);
+ } else {
+ primaryShard.tell(message, ActorRef.noSender());
+ }
+ }
+ }, getClientDispatcher());
+ }
+ }
+
+ public FiniteDuration getOperationDuration() {
+ return operationDuration;
+ }
+
+ public boolean isPathLocal(String path) {
+ if (Strings.isNullOrEmpty(path)) {
+ return false;
+ }
+
+ int pathAtIndex = path.indexOf('@');
+ if (pathAtIndex == -1) {
+ //if the path is of local format, then its local and is co-located
+ return true;
+
+ } else if (selfAddressHostPort != null) {
+ // self-address and tx actor path, both are of remote path format
+ int slashIndex = path.indexOf('/', pathAtIndex);
+
+ if (slashIndex == -1) {
+ return false;
+ }
+
+ String hostPort = path.substring(pathAtIndex + 1, slashIndex);
+ return hostPort.equals(selfAddressHostPort);
+
+ } else {
+ // self address is local format and tx actor path is remote format
+ return false;
+ }
+ }
+
+ /**
+ * @deprecated This method is present only to support backward compatibility with Helium and should not be
+ * used any further
+ *
+ *
+ * @param primaryPath
+ * @param localPathOfRemoteActor
+ * @return
+ */
+ @Deprecated
+ public String resolvePath(final String primaryPath,
+ final String localPathOfRemoteActor) {
+ StringBuilder builder = new StringBuilder();
+ String[] primaryPathElements = primaryPath.split("/");
+ builder.append(primaryPathElements[0]).append("//")
+ .append(primaryPathElements[1]).append(primaryPathElements[2]);
+ String[] remotePathElements = localPathOfRemoteActor.split("/");
+ for (int i = 3; i < remotePathElements.length; i++) {
+ builder.append("/").append(remotePathElements[i]);
+ }
+
+ return builder.toString();
+ }
+
+ /**
+ * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+ * should begin throttling the operations
+ *
+ * Parking reading this configuration here because we need to get to the actor system settings