X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContextTest.java;h=960e0328d6cab5142d59540df7cbcdafffff49eb;hp=fd41c49390b484fd0d4343befa2f920d542e2f74;hb=c32a09739f0f7a008fe203b7b4ca172755136307;hpb=a23ab6d60b7b57184a8fe59e282e46b448c86d6a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index fd41c49390..960e0328d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,58 +1,120 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.dispatch.Futures; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.raft.utils.EchoActor; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +public class ActorContextTest extends AbstractActorTest { + + static final Logger LOG = LoggerFactory.getLogger(ActorContextTest.class); -public class ActorContextTest extends AbstractActorTest{ + private static class TestMessage { + } private static class MockShardManager extends UntypedActor { private final boolean found; private final ActorRef actorRef; + private final Map findPrimaryResponses = Maps.newHashMap(); - private MockShardManager(boolean found, ActorRef actorRef){ + private MockShardManager(final boolean found, final ActorRef actorRef) { this.found = found; this.actorRef = actorRef; } - @Override public void onReceive(Object message) throws Exception { - if(found){ + @Override public void onReceive(final Object message) throws Exception { + if (message instanceof FindPrimary) { + FindPrimary fp = (FindPrimary)message; + Object resp = findPrimaryResponses.get(fp.getShardName()); + if (resp == null) { + LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName()); + } else { + getSender().tell(resp, getSelf()); + } + + return; + } + + if (found) { getSender().tell(new LocalShardFound(actorRef), getSelf()); } else { getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf()); } } - private static Props props(final boolean found, final ActorRef actorRef){ - return Props.create(new MockShardManagerCreator(found, actorRef) ); + void addFindPrimaryResp(final String shardName, final Object resp) { + findPrimaryResponses.put(shardName, resp); + } + + private static Props props(final boolean found, final ActorRef actorRef) { + return Props.create(new MockShardManagerCreator(found, actorRef)); + } + + private static Props props() { + return Props.create(new MockShardManagerCreator()); } @SuppressWarnings("serial") @@ -60,7 +122,12 @@ public class ActorContextTest extends AbstractActorTest{ final boolean found; final ActorRef actorRef; - MockShardManagerCreator(boolean found, ActorRef actorRef) { + MockShardManagerCreator() { + this.found = false; + this.actorRef = null; + } + + MockShardManagerCreator(final boolean found, final ActorRef actorRef) { this.found = found; this.actorRef = actorRef; } @@ -73,93 +140,93 @@ public class ActorContextTest extends AbstractActorTest{ } @Test - public void testFindLocalShardWithShardFound(){ - new JavaTestKit(getSystem()) {{ - - new Within(duration("1 seconds")) { - @Override - protected void run() { - - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + public void testFindLocalShardWithShardFound() { + new JavaTestKit(getSystem()) { + { + new Within(duration("1 seconds")) { + @Override + protected void run() { - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); - Optional out = actorContext.findLocalShard("default"); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mock(Configuration.class)); - assertEquals(shardActorRef, out.get()); + Optional out = actorContext.findLocalShard("default"); + assertEquals(shardActorRef, out.get()); - expectNoMsg(); - } - }; - }}; + expectNoMsg(); + } + }; + } + }; } @Test - public void testFindLocalShardWithShardNotFound(){ - new JavaTestKit(getSystem()) {{ - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(false, null)); + public void testFindLocalShardWithShardNotFound() { + new JavaTestKit(getSystem()) { + { + ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null)); - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mock(Configuration.class)); - Optional out = actorContext.findLocalShard("default"); - assertTrue(!out.isPresent()); - }}; + Optional out = actorContext.findLocalShard("default"); + assertTrue(!out.isPresent()); + } + }; } @Test public void testExecuteRemoteOperation() { - new JavaTestKit(getSystem()) {{ - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + new JavaTestKit(getSystem()) { + { + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); + ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mock(Configuration.class)); - ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - Object out = actorContext.executeOperation(actor, "hello"); + Object out = actorContext.executeOperation(actor, "hello"); - assertEquals("hello", out); - }}; + assertEquals("hello", out); + } + }; } @Test + @SuppressWarnings("checkstyle:IllegalCatch") public void testExecuteRemoteOperationAsync() { - new JavaTestKit(getSystem()) {{ - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + new JavaTestKit(getSystem()) { + { + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); + ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mock(Configuration.class)); - ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - Future future = actorContext.executeOperationAsync(actor, "hello"); + Future future = actorContext.executeOperationAsync(actor, "hello"); - try { - Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - assertEquals("Result", "hello", result); - } catch(Exception e) { - throw new AssertionError(e); + try { + Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); + assertEquals("Result", "hello", result); + } catch (Exception e) { + throw new AssertionError(e); + } } - }}; + }; } @Test @@ -189,13 +256,13 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a")); // self address of remote format,but Tx path local format. - clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550)); actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); assertEquals(true, actorContext.isPathLocal( "akka://system/user/shardmanager/shard/transaction")); // self address of local format,but Tx path remote format. - clusterWrapper.setSelfAddress(new Address("akka.tcp", "system")); + clusterWrapper.setSelfAddress(new Address("akka", "system")); actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); assertEquals(false, actorContext.isPathLocal( "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction")); @@ -206,163 +273,235 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a")); //ip and port same - clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550)); actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); - assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/")); + assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/")); // forward-slash missing in address - clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550)); actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); - assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550")); + assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550")); //ips differ - clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550)); actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); - assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/")); + assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/")); //ports differ - clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550)); + clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550)); actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); - assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/")); + assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/")); } @Test - public void testResolvePathForRemoteActor() { - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock( - ClusterWrapper.class), - mock(Configuration.class)); - - String actual = actorContext.resolvePath( - "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard", - "akka://system/user/shardmanager/shard/transaction"); + public void testClientDispatcherIsGlobalDispatcher() { + ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); - String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction"; - - assertEquals(expected, actual); + assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); } @Test - public void testResolvePathForLocalActor() { - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class)); + public void testClientDispatcherIsNotGlobalDispatcher() { + ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", + ConfigFactory.load("application-with-custom-dispatchers.conf")); - String actual = actorContext.resolvePath( - "akka://system/user/shardmanager/shard", - "akka://system/user/shardmanager/shard/transaction"); + ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); - String expected = "akka://system/user/shardmanager/shard/transaction"; + assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); - assertEquals(expected, actual); + actorSystem.terminate(); } @Test - public void testResolvePathForRemoteActorWithProperRemoteAddress() { - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class)); + public void testSetDatastoreContext() { + new JavaTestKit(getSystem()) { + { + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder() + .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), + new PrimaryShardInfoFutureCache()); - String actual = actorContext.resolvePath( - "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard", - "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"); + assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 7, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); - String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"; + DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6) + .shardTransactionCommitTimeoutInSeconds(8).build(); - assertEquals(expected, actual); + DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); + Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext(); + + actorContext.setDatastoreContext(mockContextFactory); + + expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class); + + Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); + + assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 8, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + } + }; } @Test - public void testRateLimiting(){ - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception { - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); + ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props()); - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder() + .logicalStoreType(LogicalDatastoreType.CONFIGURATION) + .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); - // Check that the initial value is being picked up from DataStoreContext - assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; + final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION; + ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { + @Override + protected Future doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) { + return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion)); + } + }; - actorContext.setTxCreationLimit(1.0); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); - assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15); + assertNotNull(actual); + assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion()); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - StopWatch watch = new StopWatch(); + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); - watch.start(); + assertEquals(cachedInfo, actual); - actorContext.acquireTxCreationPermit(); - actorContext.acquireTxCreationPermit(); - actorContext.acquireTxCreationPermit(); + actorContext.getPrimaryShardInfoCache().remove("foobar"); - watch.stop(); + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - assertTrue("did not take as much time as expected", watch.getTime() > 1000); + assertNull(cached); } @Test - public void testClientDispatcherIsGlobalDispatcher(){ + public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception { - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props()); - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder() + .logicalStoreType(LogicalDatastoreType.CONFIGURATION) + .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); - - assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + final DataTree mockDataTree = Mockito.mock(DataTree.class); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; + ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { + @Override + protected Future doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) { + return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree)); + } + }; - } + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); - @Test - public void testClientDispatcherIsNotGlobalDispatcher(){ + assertNotNull(actual); + assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent()); + assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion()); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); - ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); + assertEquals(cachedInfo, actual); - ActorContext actorContext = - new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + actorContext.getPrimaryShardInfoCache().remove("foobar"); - assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - actorSystem.shutdown(); + assertNull(cached); + } + @Test + public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { + testFindPrimaryExceptions(new PrimaryNotFoundException("not found")); } @Test - public void testSetDatastoreContext() { - new JavaTestKit(getSystem()) {{ - ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), - mock(Configuration.class), DatastoreContext.newBuilder(). - operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build()); + public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + testFindPrimaryExceptions(new NotInitializedException("not initialized")); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private static void testFindPrimaryExceptions(final Object expectedException) throws Exception { + ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props()); - assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); - assertEquals("getTransactionCommitOperationTimeout", 7, - actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder() + .logicalStoreType(LogicalDatastoreType.CONFIGURATION) + .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { + @Override + protected Future doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) { + return Futures.successful(expectedException); + } + }; - DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6). - shardTransactionCommitTimeoutInSeconds(8).build(); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); - actorContext.setDatastoreContext(newContext); + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected" + expectedException.getClass().toString()); + } catch (Exception e) { + if (!expectedException.getClass().isInstance(e)) { + fail("Expected Exception of type " + expectedException.getClass().toString()); + } + } - expectMsgClass(duration("5 seconds"), DatastoreContext.class); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); + assertNull(cached); + } - assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); - assertEquals("getTransactionCommitOperationTimeout", 8, - actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); - }}; + @Test + public void testBroadcast() { + new JavaTestKit(getSystem()) { + { + ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props()); + ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props()); + + TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), + MockShardManager.props()); + MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); + shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound( + shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION)); + shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound( + shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION)); + shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); + + Configuration mockConfig = mock(Configuration.class); + doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig) + .getAllShardNames(); + + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mockConfig, + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), + new PrimaryShardInfoFutureCache()); + + actorContext.broadcast(v -> new TestMessage(), TestMessage.class); + + MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class); + MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class); + } + }; } }