X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContextTest.java;h=405fe5221b64255e086db7b37eb1af1679e4da18;hp=eae46da2eee53bd4b2cf5ee7d2cb823e0111b6be;hb=ae11ac10dfd3579b1b685455ea642bbb08de68f1;hpb=bd8beb1bfee9f421ad8f2d07b1424b21038234a2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index eae46da2ee..405fe5221b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,37 +1,83 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.dispatch.Futures; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.time.StopWatch; +import java.util.function.Function; +import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.raft.utils.EchoActor; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; public class ActorContextTest extends AbstractActorTest{ + static final Logger log = LoggerFactory.getLogger(ActorContextTest.class); + + private static class TestMessage { + } + private static class MockShardManager extends UntypedActor { private final boolean found; private final ActorRef actorRef; + private final Map findPrimaryResponses = Maps.newHashMap(); private MockShardManager(boolean found, ActorRef actorRef){ @@ -40,6 +86,18 @@ public class ActorContextTest extends AbstractActorTest{ } @Override public void onReceive(Object message) throws Exception { + if(message instanceof FindPrimary) { + FindPrimary fp = (FindPrimary)message; + Object resp = findPrimaryResponses.get(fp.getShardName()); + if(resp == null) { + log.error("No expected FindPrimary response found for shard name {}", fp.getShardName()); + } else { + getSender().tell(resp, getSelf()); + } + + return; + } + if(found){ getSender().tell(new LocalShardFound(actorRef), getSelf()); } else { @@ -47,15 +105,28 @@ public class ActorContextTest extends AbstractActorTest{ } } + void addFindPrimaryResp(String shardName, Object resp) { + findPrimaryResponses.put(shardName, resp); + } + private static Props props(final boolean found, final ActorRef actorRef){ return Props.create(new MockShardManagerCreator(found, actorRef) ); } + private static Props props(){ + return Props.create(new MockShardManagerCreator() ); + } + @SuppressWarnings("serial") private static class MockShardManagerCreator implements Creator { final boolean found; final ActorRef actorRef; + MockShardManagerCreator() { + this.found = false; + this.actorRef = null; + } + MockShardManagerCreator(boolean found, ActorRef actorRef) { this.found = found; this.actorRef = actorRef; @@ -223,80 +294,220 @@ public class ActorContextTest extends AbstractActorTest{ } @Test - public void testResolvePathForRemoteActor() { + public void testClientDispatcherIsGlobalDispatcher(){ ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock( - ClusterWrapper.class), - mock(Configuration.class)); - - String actual = actorContext.resolvePath( - "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard", - "akka://system/user/shardmanager/shard/transaction"); + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); - String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction"; + assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); - assertEquals(expected, actual); } @Test - public void testResolvePathForLocalActor() { + public void testClientDispatcherIsNotGlobalDispatcher(){ + ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); + ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class)); + new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); - String actual = actorContext.resolvePath( - "akka://system/user/shardmanager/shard", - "akka://system/user/shardmanager/shard/transaction"); + assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); - String expected = "akka://system/user/shardmanager/shard/transaction"; + actorSystem.shutdown(); - assertEquals(expected, actual); } @Test - public void testResolvePathForRemoteActorWithProperRemoteAddress() { - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class)); + public void testSetDatastoreContext() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder(). + operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache()); + + assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 7, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + + DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6). + shardTransactionCommitTimeoutInSeconds(8).build(); + + DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); + Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext(); - String actual = actorContext.resolvePath( - "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard", - "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"); + actorContext.setDatastoreContext(mockContextFactory); - String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"; + expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class); - assertEquals(expected, actual); + Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); + + assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 8, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + }}; } @Test - public void testRateLimiting(){ - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception { - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder(). + logicalStoreType(LogicalDatastoreType.CONFIGURATION). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); - // Check that the initial value is being picked up from DataStoreContext - assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; + final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION; + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion)); + } + }; - actorContext.setTxCreationLimit(1.0); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); - assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15); + assertNotNull(actual); + assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion()); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - StopWatch watch = new StopWatch(); + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); - watch.start(); + assertEquals(cachedInfo, actual); - actorContext.acquireTxCreationPermit(); - actorContext.acquireTxCreationPermit(); - actorContext.acquireTxCreationPermit(); + actorContext.getPrimaryShardInfoCache().remove("foobar"); - watch.stop(); + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - assertTrue("did not take as much time as expected", watch.getTime() > 1000); + assertNull(cached); } + + @Test + public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext dataStoreContext = DatastoreContext.newBuilder(). + logicalStoreType(LogicalDatastoreType.CONFIGURATION). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + + final DataTree mockDataTree = Mockito.mock(DataTree.class); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree)); + } + }; + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + + assertNotNull(actual); + assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent()); + assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion()); + + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); + + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + + assertEquals(cachedInfo, actual); + + actorContext.getPrimaryShardInfoCache().remove("foobar"); + + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); + + assertNull(cached); + } + + @Test + public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { + testFindPrimaryExceptions(new PrimaryNotFoundException("not found")); + } + + @Test + public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + testFindPrimaryExceptions(new NotInitializedException("not initialized")); + } + + private void testFindPrimaryExceptions(final Object expectedException) throws Exception { + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext dataStoreContext = DatastoreContext.newBuilder(). + logicalStoreType(LogicalDatastoreType.CONFIGURATION). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful(expectedException); + } + }; + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected" + expectedException.getClass().toString()); + } catch(Exception e){ + if(!expectedException.getClass().isInstance(e)) { + fail("Expected Exception of type " + expectedException.getClass().toString()); + } + } + + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); + + assertNull(cached); + } + + @Test + public void testBroadcast() { + new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); + MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); + shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(), + DataStoreVersions.CURRENT_VERSION)); + shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(), + DataStoreVersions.CURRENT_VERSION)); + shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); + + Configuration mockConfig = mock(Configuration.class); + doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))). + when(mockConfig).getAllShardNames(); + + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mockConfig, + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache()); + + actorContext.broadcast(new Function() { + @Override + public Object apply(Short v) { + return new TestMessage(); + } + }); + + MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class); + MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class); + }}; + } + }