Bug-2136 - fix for is-local-path 00/12700/8
authorKamal Rameshan <kramesha@cisco.com>
Mon, 10 Nov 2014 22:35:39 +0000 (14:35 -0800)
committerKamal Rameshan <kramesha@cisco.com>
Thu, 13 Nov 2014 23:06:59 +0000 (23:06 +0000)
Change-Id: I38a58683c94c54e9f399df56aa586e3916f75e41
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java

index 58d805b2b50680be8ab1feb6c6d30bb365bfdf64..81a8c7e53f6f5af2aeb6694328005a3d2fe2beda 100644 (file)
@@ -9,9 +9,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.Address;
 
 public interface ClusterWrapper {
     void subscribeToMemberEvents(ActorRef actorRef);
     String getCurrentMemberName();
-    String getSelfAddress();
+    Address getSelfAddress();
 }
index 857510ad4b5360ef3870ac16bce00e188cee88a7..4edd60a33af436fff36e77627657d2cd7844a6c3 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import com.google.common.base.Preconditions;
@@ -17,7 +18,7 @@ import com.google.common.base.Preconditions;
 public class ClusterWrapperImpl implements ClusterWrapper {
     private final Cluster cluster;
     private final String currentMemberName;
-    private final String selfAddress;
+    private final Address selfAddress;
 
     public ClusterWrapperImpl(ActorSystem actorSystem){
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
@@ -32,7 +33,7 @@ public class ClusterWrapperImpl implements ClusterWrapper {
         );
 
         currentMemberName = (String) cluster.getSelfRoles().toArray()[0];
-        selfAddress = cluster.selfAddress().toString();
+        selfAddress = cluster.selfAddress();
 
     }
 
@@ -48,7 +49,7 @@ public class ClusterWrapperImpl implements ClusterWrapper {
         return currentMemberName;
     }
 
-    public String getSelfAddress() {
+    public Address getSelfAddress() {
         return selfAddress;
     }
 }
index 443e0af9e031392fe1ebb08d03d956b883b91dab..ebed05b6a7126170f2bba39bff4ced72bf9908a2 100644 (file)
@@ -710,7 +710,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
             // TxActor is always created where the leader of the shard is.
             // Check if TxActor is created in the same node
-            boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
+            boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
             return new TransactionContextImpl(transactionPath, transactionActor, identifier,
                 actorContext, schemaContext, isTxActorLocal, reply.getVersion());
index 904dcdf43989bebfeb180b5f56c489cbf035621a..f81c2a87cd8694dab4c5c194a3a3be05af6bc282 100644 (file)
@@ -12,12 +12,15 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.actor.PoisonPill;
 import akka.dispatch.Mapper;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -41,7 +44,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
+
 import static akka.pattern.Patterns.ask;
 
 /**
@@ -80,6 +83,7 @@ public class ActorContext {
     private volatile SchemaContext schemaContext;
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
+    private final String selfAddressHostPort;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -99,6 +103,13 @@ public class ActorContext {
         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
                 TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
+
+        Address selfAddress = clusterWrapper.getSelfAddress();
+        if (selfAddress != null && !selfAddress.host().isEmpty()) {
+            selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
+        } else {
+            selfAddressHostPort = null;
+        }
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -370,30 +381,31 @@ public class ActorContext {
         return operationDuration;
     }
 
-    public boolean isLocalPath(String path) {
-        String selfAddress = clusterWrapper.getSelfAddress();
-        if (path == null || selfAddress == null) {
+    public boolean isPathLocal(String path) {
+        if (Strings.isNullOrEmpty(path)) {
             return false;
         }
 
-        int atIndex1 = path.indexOf("@");
-        int atIndex2 = selfAddress.indexOf("@");
+        int pathAtIndex = path.indexOf("@");
+        if (pathAtIndex == -1) {
+            //if the path is of local format, then its local and is co-located
+            return true;
 
-        if (atIndex1 == -1 || atIndex2 == -1) {
-            return false;
-        }
+        } else if (selfAddressHostPort != null) {
+            // self-address and tx actor path, both are of remote path format
+            int slashIndex = path.indexOf("/", pathAtIndex);
+
+            if (slashIndex == -1) {
+                return false;
+            }
 
-        int slashIndex1 = path.indexOf("/", atIndex1);
-        int slashIndex2 = selfAddress.indexOf("/", atIndex2);
+            String hostPort = path.substring(pathAtIndex + 1, slashIndex);
+            return hostPort.equals(selfAddressHostPort);
 
-        if (slashIndex1 == -1 || slashIndex2 == -1) {
+        } else {
+            // self address is local format and tx actor path is remote format
             return false;
         }
-
-        String hostPort1 = path.substring(atIndex1, slashIndex1);
-        String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
-
-        return hostPort1.equals(hostPort2);
     }
 
     /**
index b77b0b65cf86edb06f370e768725c5f86cb3a439..cdf085e2ff2bd1c069281cfc8b96fdfc50c1382f 100644 (file)
@@ -358,7 +358,7 @@ public class TransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
-        doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
+        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
         return actorRef;
     }
@@ -899,7 +899,7 @@ public class TransactionProxyTest {
         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
+        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -1069,7 +1069,7 @@ public class TransactionProxyTest {
             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                 eqCreateTransaction(memberName, READ_ONLY));
 
-        doReturn(true).when(mockActorContext).isLocalPath(actorPath);
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
 
@@ -1124,7 +1124,7 @@ public class TransactionProxyTest {
         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                 eqCreateTransaction(memberName, WRITE_ONLY));
 
-        doReturn(true).when(mockActorContext).isLocalPath(actorPath);
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
index 39d337e91b6005dc938e98232bbf105fc13ce220..fcb0324bea77e1608dbd8b6d3d7f2077d2c27c4c 100644 (file)
@@ -3,11 +3,13 @@ package org.opendaylight.controller.cluster.datastore.utils;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -19,8 +21,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -158,32 +158,67 @@ public class ActorContextTest extends AbstractActorTest{
     }
 
     @Test
-    public void testIsLocalPath() {
+    public void testIsPathLocal() {
         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
-        ActorContext actorContext =
-                new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        ActorContext actorContext = null;
 
-        clusterWrapper.setSelfAddress("");
-        assertEquals(false, actorContext.isLocalPath(null));
-        assertEquals(false, actorContext.isLocalPath(""));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(null));
+        assertEquals(false, actorContext.isPathLocal(""));
 
         clusterWrapper.setSelfAddress(null);
-        assertEquals(false, actorContext.isLocalPath(""));
-
-        clusterWrapper.setSelfAddress("akka://test/user/$b");
-        assertEquals(false, actorContext.isLocalPath("akka://test/user/$a"));
-
-        clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550/");
-        assertEquals(true, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
-
-        clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550");
-        assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
-
-        clusterWrapper.setSelfAddress("akka.tcp://system@128.0.0.1:2550/");
-        assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
-
-        clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
-        assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(""));
+
+        // even if the path is in local format, match the primary path (first 3 elements) and return true
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
+
+        // self address of remote format,but Tx path local format.
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal(
+            "akka://system/user/shardmanager/shard/transaction"));
+
+        // self address of local format,but Tx path remote format.
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(
+            "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
+
+        //local path but not same
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
+
+        //ip and port same
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
+
+        // forward-slash missing in address
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
+
+        //ips differ
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
+
+        //ports differ
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
     }
 
     @Test
index b80506d17d2efa61f5ae5673d3687dc67501bdee..fe40aa0fd4571c65f431124d3b58704b9b62b9c6 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
+import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.ClusterEvent;
 import akka.cluster.MemberStatus;
@@ -20,7 +21,7 @@ import java.util.Set;
 
 public class MockClusterWrapper implements ClusterWrapper{
 
-    private String selfAddress = "akka.tcp://test@127.0.0.1:2550/user/member-1-shard-test-config";
+    private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
 
     @Override
     public void subscribeToMemberEvents(ActorRef actorRef) {
@@ -32,11 +33,11 @@ public class MockClusterWrapper implements ClusterWrapper{
     }
 
     @Override
-    public String getSelfAddress() {
+    public Address getSelfAddress() {
         return selfAddress;
     }
 
-    public void setSelfAddress(String selfAddress) {
+    public void setSelfAddress(Address selfAddress) {
         this.selfAddress = selfAddress;
     }