package org.opendaylight.controller.cluster.datastore.utils;
-import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
-
+import com.google.common.base.Optional;
+import com.typesafe.config.ConfigFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
public class ActorContextTest extends AbstractActorTest{
- @Test
- public void testResolvePathForRemoteActor(){
- ActorContext actorContext =
- new ActorContext(mock(ActorSystem.class), mock(ActorRef.class),mock(
- ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testResolvePathForLocalActor(){
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka://system/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka://system/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
-
- System.out.println(actorContext
- .actorFor("akka://system/user/shardmanager/shard/transaction"));
- }
-
private static class MockShardManager extends UntypedActor {
}
@Test
- public void testExecuteLocalShardOperationWithShardFound(){
+ public void testFindLocalShardWithShardFound(){
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.executeLocalShardOperation("default", "hello");
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
- assertEquals("hello", out);
+ assertEquals(shardActorRef, out.get());
expectNoMsg();
}
@Test
- public void testExecuteLocalShardOperationWithShardNotFound(){
+ public void testFindLocalShardWithShardNotFound(){
new JavaTestKit(getSystem()) {{
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(false, null));
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(false, null));
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
+ assertTrue(!out.isPresent());
+ }};
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ }
+
+ @Test
+ public void testExecuteRemoteOperation() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
- Object out = actorContext.executeLocalShardOperation("default", "hello");
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
- assertNull(out);
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- expectNoMsg();
- }
- };
- }};
+ Object out = actorContext.executeOperation(actor, "hello");
+ assertEquals("hello", out);
+ }};
}
-
@Test
- public void testFindLocalShardWithShardFound(){
+ public void testExecuteRemoteOperationAsync() {
new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.findLocalShard("default");
-
- assertEquals(shardActorRef, out);
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+ Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
- expectNoMsg();
- }
- };
+ try {
+ Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+ assertEquals("Result", "hello", result);
+ } catch(Exception e) {
+ throw new AssertionError(e);
+ }
}};
+ }
+ @Test
+ public void testIsPathLocal() {
+ MockClusterWrapper clusterWrapper = new MockClusterWrapper();
+ ActorContext actorContext = null;
+
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(null));
+ assertEquals(false, actorContext.isPathLocal(""));
+
+ clusterWrapper.setSelfAddress(null);
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(""));
+
+ // even if the path is in local format, match the primary path (first 3 elements) and return true
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
+
+ // self address of remote format,but Tx path local format.
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal(
+ "akka://system/user/shardmanager/shard/transaction"));
+
+ // self address of local format,but Tx path remote format.
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(
+ "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
+
+ //local path but not same
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
+
+ //ip and port same
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
+
+ // forward-slash missing in address
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
+
+ //ips differ
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
+
+ //ports differ
+ clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
}
@Test
- public void testFindLocalShardWithShardNotFound(){
- new JavaTestKit(getSystem()) {{
+ public void testResolvePathForRemoteActor() {
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(
+ ClusterWrapper.class),
+ mock(Configuration.class));
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
+ String actual = actorContext.resolvePath(
+ "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+ "akka://system/user/shardmanager/shard/transaction");
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(false, null));
+ String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ assertEquals(expected, actual);
+ }
- Object out = actorContext.findLocalShard("default");
+ @Test
+ public void testResolvePathForLocalActor() {
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class));
- assertNull(out);
+ String actual = actorContext.resolvePath(
+ "akka://system/user/shardmanager/shard",
+ "akka://system/user/shardmanager/shard/transaction");
+ String expected = "akka://system/user/shardmanager/shard/transaction";
- expectNoMsg();
- }
- };
- }};
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testResolvePathForRemoteActorWithProperRemoteAddress() {
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ String actual = actorContext.resolvePath(
+ "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
+ "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+
+ String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
+ assertEquals(expected, actual);
}
@Test
- public void testExecuteRemoteOperation() {
- new JavaTestKit(getSystem()) {{
+ public void testRateLimiting(){
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
- ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
+ // Check that the initial value is being picked up from DataStoreContext
+ assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ actorContext.setTxCreationLimit(1.0);
- ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+ assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
- Object out = actorContext.executeRemoteOperation(actor, "hello");
- assertEquals("hello", out);
+ StopWatch watch = new StopWatch();
- expectNoMsg();
- }
- };
- }};
+ watch.start();
+
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+
+ watch.stop();
+
+ assertTrue("did not take as much time as expected", watch.getTime() > 1000);
}
@Test
- public void testExecuteRemoteOperationAsync() {
- new JavaTestKit(getSystem()) {{
+ public void testClientDispatcherIsGlobalDispatcher(){
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
- ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
- ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+ }
- Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
+ @Test
+ public void testClientDispatcherIsNotGlobalDispatcher(){
- try {
- Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- assertEquals("Result", "hello", result);
- } catch(Exception e) {
- throw new AssertionError(e);
- }
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+ ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+
+ ActorContext actorContext =
+ new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
+
+ assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+ actorSystem.shutdown();
- expectNoMsg();
- }
- };
- }};
}
+
}