From a8477c3af9b6b2de6037404f11e1835ac6c710c2 Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Mon, 10 Nov 2014 14:35:39 -0800 Subject: [PATCH] Bug-2136 - fix for is-local-path Change-Id: I38a58683c94c54e9f399df56aa586e3916f75e41 Signed-off-by: Kamal Rameshan --- .../cluster/datastore/ClusterWrapper.java | 3 +- .../cluster/datastore/ClusterWrapperImpl.java | 7 +- .../cluster/datastore/TransactionProxy.java | 2 +- .../cluster/datastore/utils/ActorContext.java | 46 ++++++---- .../datastore/TransactionProxyTest.java | 8 +- .../datastore/utils/ActorContextTest.java | 83 +++++++++++++------ .../datastore/utils/MockClusterWrapper.java | 7 +- 7 files changed, 103 insertions(+), 53 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java index 58d805b2b5..81a8c7e53f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java @@ -9,9 +9,10 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.Address; public interface ClusterWrapper { void subscribeToMemberEvents(ActorRef actorRef); String getCurrentMemberName(); - String getSelfAddress(); + Address getSelfAddress(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java index 857510ad4b..4edd60a33a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Address; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import com.google.common.base.Preconditions; @@ -17,7 +18,7 @@ import com.google.common.base.Preconditions; public class ClusterWrapperImpl implements ClusterWrapper { private final Cluster cluster; private final String currentMemberName; - private final String selfAddress; + private final Address selfAddress; public ClusterWrapperImpl(ActorSystem actorSystem){ Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); @@ -32,7 +33,7 @@ public class ClusterWrapperImpl implements ClusterWrapper { ); currentMemberName = (String) cluster.getSelfRoles().toArray()[0]; - selfAddress = cluster.selfAddress().toString(); + selfAddress = cluster.selfAddress(); } @@ -48,7 +49,7 @@ public class ClusterWrapperImpl implements ClusterWrapper { return currentMemberName; } - public String getSelfAddress() { + public Address getSelfAddress() { return selfAddress; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 443e0af9e0..ebed05b6a7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -710,7 +710,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // TxActor is always created where the leader of the shard is. // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); + boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); return new TransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 904dcdf439..f81c2a87cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -12,12 +12,15 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.Address; import akka.actor.PoisonPill; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -41,7 +44,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; + import static akka.pattern.Patterns.ask; /** @@ -80,6 +83,7 @@ public class ActorContext { private volatile SchemaContext schemaContext; private final FiniteDuration operationDuration; private final Timeout operationTimeout; + private final String selfAddressHostPort; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -99,6 +103,13 @@ public class ActorContext { operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); + + Address selfAddress = clusterWrapper.getSelfAddress(); + if (selfAddress != null && !selfAddress.host().isEmpty()) { + selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get(); + } else { + selfAddressHostPort = null; + } } public DatastoreContext getDatastoreContext() { @@ -370,30 +381,31 @@ public class ActorContext { return operationDuration; } - public boolean isLocalPath(String path) { - String selfAddress = clusterWrapper.getSelfAddress(); - if (path == null || selfAddress == null) { + public boolean isPathLocal(String path) { + if (Strings.isNullOrEmpty(path)) { return false; } - int atIndex1 = path.indexOf("@"); - int atIndex2 = selfAddress.indexOf("@"); + int pathAtIndex = path.indexOf("@"); + if (pathAtIndex == -1) { + //if the path is of local format, then its local and is co-located + return true; - if (atIndex1 == -1 || atIndex2 == -1) { - return false; - } + } else if (selfAddressHostPort != null) { + // self-address and tx actor path, both are of remote path format + int slashIndex = path.indexOf("/", pathAtIndex); + + if (slashIndex == -1) { + return false; + } - int slashIndex1 = path.indexOf("/", atIndex1); - int slashIndex2 = selfAddress.indexOf("/", atIndex2); + String hostPort = path.substring(pathAtIndex + 1, slashIndex); + return hostPort.equals(selfAddressHostPort); - if (slashIndex1 == -1 || slashIndex2 == -1) { + } else { + // self address is local format and tx actor path is remote format return false; } - - String hostPort1 = path.substring(atIndex1, slashIndex1); - String hostPort2 = selfAddress.substring(atIndex2, slashIndex2); - - return hostPort1.equals(hostPort2); } /** diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index b77b0b65cf..cdf085e2ff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -358,7 +358,7 @@ public class TransactionProxyTest { executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); - doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString()); + doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); return actorRef; } @@ -899,7 +899,7 @@ public class TransactionProxyTest { doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString()); + doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -1069,7 +1069,7 @@ public class TransactionProxyTest { executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_ONLY)); - doReturn(true).when(mockActorContext).isLocalPath(actorPath); + doReturn(true).when(mockActorContext).isPathLocal(actorPath); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY); @@ -1124,7 +1124,7 @@ public class TransactionProxyTest { executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, WRITE_ONLY)); - doReturn(true).when(mockActorContext).isLocalPath(actorPath); + doReturn(true).when(mockActorContext).isPathLocal(actorPath); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 39d337e91b..fcb0324bea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -3,11 +3,13 @@ package org.opendaylight.controller.cluster.datastore.utils; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -19,8 +21,6 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -158,32 +158,67 @@ public class ActorContextTest extends AbstractActorTest{ } @Test - public void testIsLocalPath() { + public void testIsPathLocal() { MockClusterWrapper clusterWrapper = new MockClusterWrapper(); - ActorContext actorContext = - new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + ActorContext actorContext = null; - clusterWrapper.setSelfAddress(""); - assertEquals(false, actorContext.isLocalPath(null)); - assertEquals(false, actorContext.isLocalPath("")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal(null)); + assertEquals(false, actorContext.isPathLocal("")); clusterWrapper.setSelfAddress(null); - assertEquals(false, actorContext.isLocalPath("")); - - clusterWrapper.setSelfAddress("akka://test/user/$b"); - assertEquals(false, actorContext.isLocalPath("akka://test/user/$a")); - - clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550/"); - assertEquals(true, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); - - clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550"); - assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); - - clusterWrapper.setSelfAddress("akka.tcp://system@128.0.0.1:2550/"); - assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); - - clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/"); - assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("")); + + // even if the path is in local format, match the primary path (first 3 elements) and return true + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/$a")); + + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/$a")); + + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a")); + + // self address of remote format,but Tx path local format. + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal( + "akka://system/user/shardmanager/shard/transaction")); + + // self address of local format,but Tx path remote format. + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal( + "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction")); + + //local path but not same + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a")); + + //ip and port same + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/")); + + // forward-slash missing in address + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550")); + + //ips differ + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/")); + + //ports differ + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/")); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java index b80506d17d..fe40aa0fd4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.utils; import akka.actor.ActorRef; +import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.ClusterEvent; import akka.cluster.MemberStatus; @@ -20,7 +21,7 @@ import java.util.Set; public class MockClusterWrapper implements ClusterWrapper{ - private String selfAddress = "akka.tcp://test@127.0.0.1:2550/user/member-1-shard-test-config"; + private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550); @Override public void subscribeToMemberEvents(ActorRef actorRef) { @@ -32,11 +33,11 @@ public class MockClusterWrapper implements ClusterWrapper{ } @Override - public String getSelfAddress() { + public Address getSelfAddress() { return selfAddress; } - public void setSelfAddress(String selfAddress) { + public void setSelfAddress(Address selfAddress) { this.selfAddress = selfAddress; } -- 2.36.6