Merge "Bug 2265: Use new NormalizedNode streaming in messages"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index 904dcdf43989bebfeb180b5f56c489cbf035621a..f217d05bb21a12e6f92add47da5536c6f6fe12d9 100644 (file)
@@ -12,12 +12,15 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.actor.PoisonPill;
 import akka.dispatch.Mapper;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -41,7 +44,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
+
 import static akka.pattern.Patterns.ask;
 
 /**
@@ -80,6 +83,7 @@ public class ActorContext {
     private volatile SchemaContext schemaContext;
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
+    private final String selfAddressHostPort;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -99,6 +103,13 @@ public class ActorContext {
         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
                 TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
+
+        Address selfAddress = clusterWrapper.getSelfAddress();
+        if (selfAddress != null && !selfAddress.host().isEmpty()) {
+            selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
+        } else {
+            selfAddressHostPort = null;
+        }
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -268,7 +279,7 @@ public class ActorContext {
         Preconditions.checkArgument(actor != null, "actor must not be null");
         Preconditions.checkArgument(message != null, "message must not be null");
 
-        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+        LOG.debug("Sending message {} to {}", message.getClass(), actor);
         return ask(actor, message, timeout);
     }
 
@@ -303,7 +314,7 @@ public class ActorContext {
         Preconditions.checkArgument(actor != null, "actor must not be null");
         Preconditions.checkArgument(message != null, "message must not be null");
 
-        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+        LOG.debug("Sending message {} to {}", message.getClass(), actor);
 
         return ask(actor, message, timeout);
     }
@@ -330,7 +341,7 @@ public class ActorContext {
         Preconditions.checkArgument(actor != null, "actor must not be null");
         Preconditions.checkArgument(message != null, "message must not be null");
 
-        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+        LOG.debug("Sending message {} to {}", message.getClass(), actor);
 
         actor.tell(message, ActorRef.noSender());
     }
@@ -370,30 +381,31 @@ public class ActorContext {
         return operationDuration;
     }
 
-    public boolean isLocalPath(String path) {
-        String selfAddress = clusterWrapper.getSelfAddress();
-        if (path == null || selfAddress == null) {
+    public boolean isPathLocal(String path) {
+        if (Strings.isNullOrEmpty(path)) {
             return false;
         }
 
-        int atIndex1 = path.indexOf("@");
-        int atIndex2 = selfAddress.indexOf("@");
+        int pathAtIndex = path.indexOf('@');
+        if (pathAtIndex == -1) {
+            //if the path is of local format, then its local and is co-located
+            return true;
 
-        if (atIndex1 == -1 || atIndex2 == -1) {
-            return false;
-        }
+        } else if (selfAddressHostPort != null) {
+            // self-address and tx actor path, both are of remote path format
+            int slashIndex = path.indexOf('/', pathAtIndex);
+
+            if (slashIndex == -1) {
+                return false;
+            }
 
-        int slashIndex1 = path.indexOf("/", atIndex1);
-        int slashIndex2 = selfAddress.indexOf("/", atIndex2);
+            String hostPort = path.substring(pathAtIndex + 1, slashIndex);
+            return hostPort.equals(selfAddressHostPort);
 
-        if (slashIndex1 == -1 || slashIndex2 == -1) {
+        } else {
+            // self address is local format and tx actor path is remote format
             return false;
         }
-
-        String hostPort1 = path.substring(atIndex1, slashIndex1);
-        String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
-
-        return hostPort1.equals(hostPort2);
     }
 
     /**