X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContextTest.java;h=3c6a0cef5c605fbb23e3c9501676f67d95805e9d;hp=60f9a2d9dc4d9e2660137eaa77c7df491d124361;hb=e3998d55e33da9f6ecb69da75ecc71a047b6362b;hpb=c36bb2bc93ea0fbdd3ca8b2663447fc921450aee diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 60f9a2d9dc..3c6a0cef5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,26 +1,33 @@ package org.opendaylight.controller.cluster.datastore.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.typesafe.config.ConfigFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.time.StopWatch; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; public class ActorContextTest extends AbstractActorTest{ @@ -155,31 +162,181 @@ public class ActorContextTest extends AbstractActorTest{ } @Test - public void testIsLocalPath() { + public void testIsPathLocal() { MockClusterWrapper clusterWrapper = new MockClusterWrapper(); - ActorContext actorContext = - new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + ActorContext actorContext = null; - clusterWrapper.setSelfAddress(""); - assertEquals(false, actorContext.isLocalPath(null)); - assertEquals(false, actorContext.isLocalPath("")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal(null)); + assertEquals(false, actorContext.isPathLocal("")); clusterWrapper.setSelfAddress(null); - assertEquals(false, actorContext.isLocalPath("")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("")); + + // even if the path is in local format, match the primary path (first 3 elements) and return true + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/$a")); + + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/$a")); + + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a")); + + // self address of remote format,but Tx path local format. + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal( + "akka://system/user/shardmanager/shard/transaction")); + + // self address of local format,but Tx path remote format. + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal( + "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction")); + + //local path but not same + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a")); + + //ip and port same + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/")); + + // forward-slash missing in address + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550")); + + //ips differ + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/")); + + //ports differ + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/")); + } + + @Test + public void testResolvePathForRemoteActor() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock( + ClusterWrapper.class), + mock(Configuration.class)); - clusterWrapper.setSelfAddress("akka://test/user/$b"); - assertEquals(false, actorContext.isLocalPath("akka://test/user/$a")); + String actual = actorContext.resolvePath( + "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard", + "akka://system/user/shardmanager/shard/transaction"); - clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550/"); - assertEquals(true, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); + String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction"; - clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550"); - assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); + assertEquals(expected, actual); + } - clusterWrapper.setSelfAddress("akka.tcp://system@128.0.0.1:2550/"); - assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); + @Test + public void testResolvePathForLocalActor() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class)); - clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/"); - assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); + String actual = actorContext.resolvePath( + "akka://system/user/shardmanager/shard", + "akka://system/user/shardmanager/shard/transaction"); + + String expected = "akka://system/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); } + + @Test + public void testResolvePathForRemoteActorWithProperRemoteAddress() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard", + "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"); + + String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + + @Test + public void testRateLimiting(){ + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext); + + // Check that the initial value is being picked up from DataStoreContext + assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + + actorContext.setTxCreationLimit(1.0); + + assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15); + + + StopWatch watch = new StopWatch(); + + watch.start(); + + actorContext.acquireTxCreationPermit(); + actorContext.acquireTxCreationPermit(); + actorContext.acquireTxCreationPermit(); + + watch.stop(); + + assertTrue("did not take as much time as expected", watch.getTime() > 1000); + } + + @Test + public void testClientDispatcherIsGlobalDispatcher(){ + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext); + + assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + + } + + @Test + public void testClientDispatcherIsNotGlobalDispatcher(){ + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + + ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); + + ActorContext actorContext = + new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext); + + assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + + actorSystem.shutdown(); + + } + }