X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContextTest.java;h=6b4f6337785a753e68e1330f8296927aeb70b005;hp=8426b03a37de93242b506df7fd9d4368d24a169f;hb=c9d0ae2a8ce525d150f360cf49f21b7c0187cfbf;hpb=f963866fe6355cdea954d48764d59069c9aa2e04 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 8426b03a37..6b4f633778 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,36 +1,66 @@ package org.opendaylight.controller.cluster.datastore.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.dispatch.Futures; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.base.Optional; - +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.time.StopWatch; +import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; +public class ActorContextTest extends AbstractActorTest{ -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; + static final Logger log = LoggerFactory.getLogger(ActorContextTest.class); -public class ActorContextTest extends AbstractActorTest{ + private static class TestMessage { + } private static class MockShardManager extends UntypedActor { private final boolean found; private final ActorRef actorRef; + private final Map findPrimaryResponses = Maps.newHashMap(); private MockShardManager(boolean found, ActorRef actorRef){ @@ -39,6 +69,18 @@ public class ActorContextTest extends AbstractActorTest{ } @Override public void onReceive(Object message) throws Exception { + if(message instanceof FindPrimary) { + FindPrimary fp = (FindPrimary)message; + Object resp = findPrimaryResponses.get(fp.getShardName()); + if(resp == null) { + log.error("No expected FindPrimary response found for shard name {}", fp.getShardName()); + } else { + getSender().tell(resp, getSelf()); + } + + return; + } + if(found){ getSender().tell(new LocalShardFound(actorRef), getSelf()); } else { @@ -46,15 +88,28 @@ public class ActorContextTest extends AbstractActorTest{ } } + void addFindPrimaryResp(String shardName, Object resp) { + findPrimaryResponses.put(shardName, resp); + } + private static Props props(final boolean found, final ActorRef actorRef){ return Props.create(new MockShardManagerCreator(found, actorRef) ); } + private static Props props(){ + return Props.create(new MockShardManagerCreator() ); + } + @SuppressWarnings("serial") private static class MockShardManagerCreator implements Creator { final boolean found; final ActorRef actorRef; + MockShardManagerCreator() { + this.found = false; + this.actorRef = null; + } + MockShardManagerCreator(boolean found, ActorRef actorRef) { this.found = found; this.actorRef = actorRef; @@ -99,23 +154,15 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testFindLocalShardWithShardNotFound(){ new JavaTestKit(getSystem()) {{ + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(false, null)); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(false, null)); - - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), mock(Configuration.class)); - Optional out = actorContext.findLocalShard("default"); - assertTrue(!out.isPresent()); - expectNoMsg(); - } - }; + Optional out = actorContext.findLocalShard("default"); + assertTrue(!out.isPresent()); }}; } @@ -123,63 +170,385 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testExecuteRemoteOperation() { new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - new Within(duration("3 seconds")) { - @Override - protected void run() { + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); + Object out = actorContext.executeOperation(actor, "hello"); - ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + assertEquals("hello", out); + }}; + } - Object out = actorContext.executeOperation(actor, "hello"); + @Test + public void testExecuteRemoteOperationAsync() { + new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - assertEquals("hello", out); + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); - expectNoMsg(); - } - }; + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + + Future future = actorContext.executeOperationAsync(actor, "hello"); + + try { + Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); + assertEquals("Result", "hello", result); + } catch(Exception e) { + throw new AssertionError(e); + } }}; } @Test - public void testExecuteRemoteOperationAsync() { + public void testIsPathLocal() { + MockClusterWrapper clusterWrapper = new MockClusterWrapper(); + ActorContext actorContext = null; + + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal(null)); + assertEquals(false, actorContext.isPathLocal("")); + + clusterWrapper.setSelfAddress(null); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("")); + + // even if the path is in local format, match the primary path (first 3 elements) and return true + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/$a")); + + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/$a")); + + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a")); + + // self address of remote format,but Tx path local format. + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal( + "akka://system/user/shardmanager/shard/transaction")); + + // self address of local format,but Tx path remote format. + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal( + "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction")); + + //local path but not same + clusterWrapper.setSelfAddress(new Address("akka", "test")); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a")); + + //ip and port same + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/")); + + // forward-slash missing in address + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550")); + + //ips differ + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/")); + + //ports differ + clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); + assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/")); + } + + @Test + public void testResolvePathForRemoteActor() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock( + ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard", + "akka://system/user/shardmanager/shard/transaction"); + + String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + + @Test + public void testResolvePathForLocalActor() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka://system/user/shardmanager/shard", + "akka://system/user/shardmanager/shard/transaction"); + + String expected = "akka://system/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + + @Test + public void testResolvePathForRemoteActorWithProperRemoteAddress() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard", + "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"); + + String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + + @Test + public void testRateLimiting(){ + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + transactionCreationInitialRateLimit(155L).build(); + + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext); + + // Check that the initial value is being picked up from DataStoreContext + assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + + actorContext.setTxCreationLimit(1.0); + + assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15); + + + StopWatch watch = new StopWatch(); + + watch.start(); + + actorContext.acquireTxCreationPermit(); + actorContext.acquireTxCreationPermit(); + actorContext.acquireTxCreationPermit(); + + watch.stop(); + + assertTrue("did not take as much time as expected", watch.getTime() > 1000); + } + + @Test + public void testClientDispatcherIsGlobalDispatcher(){ + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder().build()); + + assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + + } + + @Test + public void testClientDispatcherIsNotGlobalDispatcher(){ + ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); + + ActorContext actorContext = + new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder().build()); + + assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + + actorSystem.shutdown(); + + } + + @Test + public void testSetDatastoreContext() { new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder(). + operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build()); - new Within(duration("3 seconds")) { - @Override - protected void run() { + assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 7, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6). + shardTransactionCommitTimeoutInSeconds(8).build(); - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); + actorContext.setDatastoreContext(newContext); - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); + expectMsgClass(duration("5 seconds"), DatastoreContext.class); - ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); - Future future = actorContext.executeOperationAsync(actor, "hello"); + assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 8, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + }}; + } - try { - Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - assertEquals("Result", "hello", result); - } catch(Exception e) { - throw new AssertionError(e); - } + @Test + public void testFindPrimaryShardAsyncPrimaryFound() throws Exception { - expectNoMsg(); - } - }; + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + + assertNotNull(actual); + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + + assertEquals(cachedSelection, actual); + + // Wait for 200 Milliseconds. The cached entry should have been removed. + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + + cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + + @Test + public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new PrimaryNotFoundException("not found")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected PrimaryNotFoundException"); + } catch(PrimaryNotFoundException e){ + + } + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + } + + @Test + public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new NotInitializedException("not iniislized")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected NotInitializedException"); + } catch(NotInitializedException e){ + + } + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + } + + @Test + public void testBroadcast() { + new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); + MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); + shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString())); + shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString())); + shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); + + Configuration mockConfig = mock(Configuration.class); + doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))). + when(mockConfig).getAllShardNames(); + + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mockConfig, + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build()); + + actorContext.broadcast(new TestMessage()); + + expectFirstMatching(shardActorRef1, TestMessage.class); + expectFirstMatching(shardActorRef2, TestMessage.class); }}; } + + private T expectFirstMatching(ActorRef actor, Class clazz) { + int count = 5000 / 50; + for(int i = 0; i < count; i++) { + try { + T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } }