import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.Address;
import akka.actor.PoisonPill;
import akka.dispatch.Mapper;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
+
import static akka.pattern.Patterns.ask;
/**
private volatile SchemaContext schemaContext;
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
+ private final String selfAddressHostPort;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
+
+ Address selfAddress = clusterWrapper.getSelfAddress();
+ if (selfAddress != null && !selfAddress.host().isEmpty()) {
+ selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
+ } else {
+ selfAddressHostPort = null;
+ }
}
public DatastoreContext getDatastoreContext() {
return operationDuration;
}
- public boolean isLocalPath(String path) {
- String selfAddress = clusterWrapper.getSelfAddress();
- if (path == null || selfAddress == null) {
+ public boolean isPathLocal(String path) {
+ if (Strings.isNullOrEmpty(path)) {
return false;
}
- int atIndex1 = path.indexOf("@");
- int atIndex2 = selfAddress.indexOf("@");
+ int pathAtIndex = path.indexOf("@");
+ if (pathAtIndex == -1) {
+ //if the path is of local format, then its local and is co-located
+ return true;
- if (atIndex1 == -1 || atIndex2 == -1) {
- return false;
- }
+ } else if (selfAddressHostPort != null) {
+ // self-address and tx actor path, both are of remote path format
+ int slashIndex = path.indexOf("/", pathAtIndex);
+
+ if (slashIndex == -1) {
+ return false;
+ }
- int slashIndex1 = path.indexOf("/", atIndex1);
- int slashIndex2 = selfAddress.indexOf("/", atIndex2);
+ String hostPort = path.substring(pathAtIndex + 1, slashIndex);
+ return hostPort.equals(selfAddressHostPort);
- if (slashIndex1 == -1 || slashIndex2 == -1) {
+ } else {
+ // self address is local format and tx actor path is remote format
return false;
}
-
- String hostPort1 = path.substring(atIndex1, slashIndex1);
- String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
-
- return hostPort1.equals(hostPort2);
}
/**
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
}
@Test
- public void testIsLocalPath() {
+ public void testIsPathLocal() {
MockClusterWrapper clusterWrapper = new MockClusterWrapper();
- ActorContext actorContext =
- new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ ActorContext actorContext = null;
- clusterWrapper.setSelfAddress("");
- assertEquals(false, actorContext.isLocalPath(null));
- assertEquals(false, actorContext.isLocalPath(""));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(null));
+ assertEquals(false, actorContext.isPathLocal(""));
clusterWrapper.setSelfAddress(null);
- assertEquals(false, actorContext.isLocalPath(""));
-
- clusterWrapper.setSelfAddress("akka://test/user/$b");
- assertEquals(false, actorContext.isLocalPath("akka://test/user/$a"));
-
- clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550/");
- assertEquals(true, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
-
- clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550");
- assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
-
- clusterWrapper.setSelfAddress("akka.tcp://system@128.0.0.1:2550/");
- assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
-
- clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
- assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(""));
+
+ // even if the path is in local format, match the primary path (first 3 elements) and return true
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
+
+ // self address of remote format,but Tx path local format.
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal(
+ "akka://system/user/shardmanager/shard/transaction"));
+
+ // self address of local format,but Tx path remote format.
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(
+ "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
+
+ //local path but not same
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
+
+ //ip and port same
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
+
+ // forward-slash missing in address
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
+
+ //ips differ
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
+
+ //ports differ
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
}
@Test