Merge "Startup arch - feature cleanup"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
index fda9ccdfdbbd44dda363c92b842d246e389466bb..3c6a0cef5c605fbb23e3c9501676f67d95805e9d 100644 (file)
@@ -1,64 +1,35 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
-import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
-
+import com.google.common.base.Optional;
+import com.typesafe.config.ConfigFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
 
 public class ActorContextTest extends AbstractActorTest{
-    @Test
-    public void testResolvePathForRemoteActor(){
-        ActorContext actorContext =
-            new ActorContext(mock(ActorSystem.class), mock(ActorRef.class),mock(
-                ClusterWrapper.class),
-                mock(Configuration.class));
-
-        String actual = actorContext.resolvePath(
-            "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
-            "akka://system/user/shardmanager/shard/transaction");
-
-        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void testResolvePathForLocalActor(){
-        ActorContext actorContext =
-            new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                mock(Configuration.class));
-
-        String actual = actorContext.resolvePath(
-            "akka://system/user/shardmanager/shard",
-            "akka://system/user/shardmanager/shard/transaction");
-
-        String expected = "akka://system/user/shardmanager/shard/transaction";
-
-        assertEquals(expected, actual);
-
-        System.out.println(actorContext
-            .actorFor("akka://system/user/shardmanager/shard/transaction"));
-    }
-
 
     private static class MockShardManager extends UntypedActor {
 
@@ -101,7 +72,7 @@ public class ActorContextTest extends AbstractActorTest{
     }
 
     @Test
-    public void testExecuteLocalShardOperationWithShardFound(){
+    public void testFindLocalShardWithShardFound(){
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
@@ -117,9 +88,9 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+                    Optional<ActorRef> out = actorContext.findLocalShard("default");
 
-                    assertEquals("hello", out);
+                    assertEquals(shardActorRef, out.get());
 
 
                     expectNoMsg();
@@ -130,150 +101,242 @@ public class ActorContextTest extends AbstractActorTest{
     }
 
     @Test
-    public void testExecuteLocalShardOperationWithShardNotFound(){
+    public void testFindLocalShardWithShardNotFound(){
         new JavaTestKit(getSystem()) {{
+            ActorRef shardManagerActorRef = getSystem()
+                    .actorOf(MockShardManager.props(false, null));
 
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
 
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(false, null));
+            Optional<ActorRef> out = actorContext.findLocalShard("default");
+            assertTrue(!out.isPresent());
+        }};
 
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+    }
+
+    @Test
+    public void testExecuteRemoteOperation() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+            ActorRef shardManagerActorRef = getSystem()
+                    .actorOf(MockShardManager.props(true, shardActorRef));
 
-                    assertNull(out);
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
 
+            ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    expectNoMsg();
-                }
-            };
-        }};
+            Object out = actorContext.executeOperation(actor, "hello");
 
+            assertEquals("hello", out);
+        }};
     }
 
-
     @Test
-    public void testFindLocalShardWithShardFound(){
+    public void testExecuteRemoteOperationAsync() {
         new JavaTestKit(getSystem()) {{
+            ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-
-                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+            ActorRef shardManagerActorRef = getSystem()
+                    .actorOf(MockShardManager.props(true, shardActorRef));
 
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(true, shardActorRef));
-
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.findLocalShard("default");
-
-                    assertEquals(shardActorRef, out);
+            ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
+            Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
 
-                    expectNoMsg();
-                }
-            };
+            try {
+                Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+                assertEquals("Result", "hello", result);
+            } catch(Exception e) {
+                throw new AssertionError(e);
+            }
         }};
+    }
 
+    @Test
+    public void testIsPathLocal() {
+        MockClusterWrapper clusterWrapper = new MockClusterWrapper();
+        ActorContext actorContext = null;
+
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(null));
+        assertEquals(false, actorContext.isPathLocal(""));
+
+        clusterWrapper.setSelfAddress(null);
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(""));
+
+        // even if the path is in local format, match the primary path (first 3 elements) and return true
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
+
+        // self address of remote format,but Tx path local format.
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal(
+            "akka://system/user/shardmanager/shard/transaction"));
+
+        // self address of local format,but Tx path remote format.
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(
+            "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
+
+        //local path but not same
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
+
+        //ip and port same
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
+
+        // forward-slash missing in address
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
+
+        //ips differ
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
+
+        //ports differ
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
     }
 
     @Test
-    public void testFindLocalShardWithShardNotFound(){
-        new JavaTestKit(getSystem()) {{
+    public void testResolvePathForRemoteActor() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(
+                        ClusterWrapper.class),
+                        mock(Configuration.class));
 
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
 
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(false, null));
+        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
 
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+        assertEquals(expected, actual);
+    }
 
-                    Object out = actorContext.findLocalShard("default");
+    @Test
+    public void testResolvePathForLocalActor() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
 
-                    assertNull(out);
+        String actual = actorContext.resolvePath(
+                "akka://system/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
 
+        String expected = "akka://system/user/shardmanager/shard/transaction";
 
-                    expectNoMsg();
-                }
-            };
-        }};
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForRemoteActorWithProperRemoteAddress() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
 
+        assertEquals(expected, actual);
     }
 
     @Test
-    public void testExecuteRemoteOperation() {
-        new JavaTestKit(getSystem()) {{
+    public void testRateLimiting(){
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
 
-                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
 
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(true, shardActorRef));
+        // Check that the initial value is being picked up from DataStoreContext
+        assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
 
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+        actorContext.setTxCreationLimit(1.0);
 
-                    ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+        assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
 
-                    Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
 
-                    assertEquals("hello", out);
+        StopWatch watch = new StopWatch();
 
-                    expectNoMsg();
-                }
-            };
-        }};
+        watch.start();
+
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+
+        watch.stop();
+
+        assertTrue("did not take as much time as expected", watch.getTime() > 1000);
     }
 
     @Test
-    public void testExecuteRemoteOperationAsync() {
-        new JavaTestKit(getSystem()) {{
+    public void testClientDispatcherIsGlobalDispatcher(){
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
 
-                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
 
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(true, shardActorRef));
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
 
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+        assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
-                    ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+    }
 
-                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
-                            Duration.create(3, TimeUnit.SECONDS));
+    @Test
+    public void testClientDispatcherIsNotGlobalDispatcher(){
 
-                    try {
-                        Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
-                        assertEquals("Result", "hello", result);
-                    } catch(Exception e) {
-                        throw new AssertionError(e);
-                    }
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+        ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+
+        ActorContext actorContext =
+                new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
+
+        assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+        actorSystem.shutdown();
 
-                    expectNoMsg();
-                }
-            };
-        }};
     }
+
 }