X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContextTest.java;h=bc80937897c97104a7b8a17ac1d0cf9eff672143;hb=e8746b7ae6620d9e0dc159f2a13d3385d6197c56;hp=6bd732e038a00055bb2407ccc416c7f192059405;hpb=76601bed132933632ab6d397e71cc2aa533bf82f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 6bd732e038..bc80937897 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -20,8 +20,12 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; @@ -30,14 +34,17 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -45,10 +52,16 @@ import scala.concurrent.duration.FiniteDuration; public class ActorContextTest extends AbstractActorTest{ + static final Logger log = LoggerFactory.getLogger(ActorContextTest.class); + + private static class TestMessage { + } + private static class MockShardManager extends UntypedActor { private final boolean found; private final ActorRef actorRef; + private final Map findPrimaryResponses = Maps.newHashMap(); private MockShardManager(boolean found, ActorRef actorRef){ @@ -57,6 +70,18 @@ public class ActorContextTest extends AbstractActorTest{ } @Override public void onReceive(Object message) throws Exception { + if(message instanceof FindPrimary) { + FindPrimary fp = (FindPrimary)message; + Object resp = findPrimaryResponses.get(fp.getShardName()); + if(resp == null) { + log.error("No expected FindPrimary response found for shard name {}", fp.getShardName()); + } else { + getSender().tell(resp, getSelf()); + } + + return; + } + if(found){ getSender().tell(new LocalShardFound(actorRef), getSelf()); } else { @@ -64,15 +89,28 @@ public class ActorContextTest extends AbstractActorTest{ } } + void addFindPrimaryResp(String shardName, Object resp) { + findPrimaryResponses.put(shardName, resp); + } + private static Props props(final boolean found, final ActorRef actorRef){ return Props.create(new MockShardManagerCreator(found, actorRef) ); } + private static Props props(){ + return Props.create(new MockShardManagerCreator() ); + } + @SuppressWarnings("serial") private static class MockShardManagerCreator implements Creator { final boolean found; final ActorRef actorRef; + MockShardManagerCreator() { + this.found = false; + this.actorRef = null; + } + MockShardManagerCreator(boolean found, ActorRef actorRef) { this.found = found; this.actorRef = actorRef; @@ -287,18 +325,15 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testRateLimiting(){ - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + transactionCreationInitialRateLimit(155L).build(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), dataStoreContext); // Check that the initial value is being picked up from DataStoreContext - assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); actorContext.setTxCreationLimit(1.0); @@ -320,16 +355,9 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testClientDispatcherIsGlobalDispatcher(){ - - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); - ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), DatastoreContext.newBuilder().build()); assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -337,18 +365,11 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testClientDispatcherIsNotGlobalDispatcher(){ - - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); - ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), DatastoreContext.newBuilder().build()); assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -388,38 +409,39 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); + return Futures.successful((Object) new PrimaryFound(expPrimaryPath)); } }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); - ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); assertNotNull(actual); + assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); - assertEquals(cachedSelection, actual); + assertEquals(cachedInfo, actual); // Wait for 200 Milliseconds. The cached entry should have been removed. Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); - cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); @@ -431,23 +453,20 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryNotFound("foobar")); + return Futures.successful((Object) new PrimaryNotFoundException("not found")); } }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); try { Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); @@ -456,10 +475,9 @@ public class ActorContextTest extends AbstractActorTest{ } - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); - } @Test @@ -468,23 +486,20 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new ActorNotInitialized()); + return Futures.successful((Object) new NotInitializedException("not iniislized")); } }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); try { Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); @@ -493,10 +508,52 @@ public class ActorContextTest extends AbstractActorTest{ } - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); + } + + @Test + public void testBroadcast() { + new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); + MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); + shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString())); + shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString())); + shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); + + Configuration mockConfig = mock(Configuration.class); + doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))). + when(mockConfig).getAllShardNames(); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mockConfig, + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build()); + + actorContext.broadcast(new TestMessage()); + + expectFirstMatching(shardActorRef1, TestMessage.class); + expectFirstMatching(shardActorRef2, TestMessage.class); + }}; } + private T expectFirstMatching(ActorRef actor, Class clazz) { + int count = 5000 / 50; + for(int i = 0; i < count; i++) { + try { + T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } }