+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.datastore.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.dispatch.Futures;
import akka.japi.Creator;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.time.StopWatch;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.raft.utils.EchoActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-public class ActorContextTest extends AbstractActorTest{
+public class ActorContextTest extends AbstractActorTest {
+
+ static final Logger LOG = LoggerFactory.getLogger(ActorContextTest.class);
- private static class MockShardManager extends UntypedActor {
+ private static class TestMessage {
+ }
+
+ private static final class MockShardManager extends UntypedActor {
private final boolean found;
private final ActorRef actorRef;
+ private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
- private MockShardManager(boolean found, ActorRef actorRef){
+ private MockShardManager(final boolean found, final ActorRef actorRef) {
this.found = found;
this.actorRef = actorRef;
}
- @Override public void onReceive(Object message) throws Exception {
- if(found){
+ @Override public void onReceive(final Object message) throws Exception {
+ if (message instanceof FindPrimary) {
+ FindPrimary fp = (FindPrimary)message;
+ Object resp = findPrimaryResponses.get(fp.getShardName());
+ if (resp == null) {
+ LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
+ } else {
+ getSender().tell(resp, getSelf());
+ }
+
+ return;
+ }
+
+ if (found) {
getSender().tell(new LocalShardFound(actorRef), getSelf());
} else {
getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
}
}
- private static Props props(final boolean found, final ActorRef actorRef){
- return Props.create(new MockShardManagerCreator(found, actorRef) );
+ void addFindPrimaryResp(final String shardName, final Object resp) {
+ findPrimaryResponses.put(shardName, resp);
+ }
+
+ private static Props props(final boolean found, final ActorRef actorRef) {
+ return Props.create(new MockShardManagerCreator(found, actorRef));
+ }
+
+ private static Props props() {
+ return Props.create(new MockShardManagerCreator());
}
@SuppressWarnings("serial")
final boolean found;
final ActorRef actorRef;
- MockShardManagerCreator(boolean found, ActorRef actorRef) {
+ MockShardManagerCreator() {
+ this.found = false;
+ this.actorRef = null;
+ }
+
+ MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
this.found = found;
this.actorRef = actorRef;
}
}
@Test
- public void testFindLocalShardWithShardFound(){
- new JavaTestKit(getSystem()) {{
-
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
+ public void testFindLocalShardWithShardFound() {
+ new TestKit(getSystem()) {
+ {
+ within(duration("1 seconds"), () -> {
ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
+ .actorOf(MockShardManager.props(true, shardActorRef));
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mock(Configuration.class));
Optional<ActorRef> out = actorContext.findLocalShard("default");
assertEquals(shardActorRef, out.get());
-
expectNoMsg();
- }
- };
- }};
+ return null;
+ });
+ }
+ };
}
@Test
- public void testFindLocalShardWithShardNotFound(){
- new JavaTestKit(getSystem()) {{
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(false, null));
+ public void testFindLocalShardWithShardNotFound() {
+ new TestKit(getSystem()) {
+ {
+ ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mock(Configuration.class));
- Optional<ActorRef> out = actorContext.findLocalShard("default");
- assertTrue(!out.isPresent());
- }};
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
+ assertTrue(!out.isPresent());
+ }
+ };
}
@Test
public void testExecuteRemoteOperation() {
- new JavaTestKit(getSystem()) {{
- ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+ new TestKit(getSystem()) {
+ {
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
+ ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mock(Configuration.class));
- ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Object out = actorContext.executeOperation(actor, "hello");
+ Object out = actorContext.executeOperation(actor, "hello");
- assertEquals("hello", out);
- }};
+ assertEquals("hello", out);
+ }
+ };
}
@Test
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void testExecuteRemoteOperationAsync() {
- new JavaTestKit(getSystem()) {{
- ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+ new TestKit(getSystem()) {
+ {
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
+ ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mock(Configuration.class));
- ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+ Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
- try {
- Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- assertEquals("Result", "hello", result);
- } catch(Exception e) {
- throw new AssertionError(e);
+ try {
+ Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+ assertEquals("Result", "hello", result);
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
}
- }};
+ };
}
@Test
assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
// self address of remote format,but Tx path local format.
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
assertEquals(true, actorContext.isPathLocal(
"akka://system/user/shardmanager/shard/transaction"));
// self address of local format,but Tx path remote format.
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
+ clusterWrapper.setSelfAddress(new Address("akka", "system"));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
assertEquals(false, actorContext.isPathLocal(
"akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
//ip and port same
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
+ assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
// forward-slash missing in address
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
//ips differ
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
//ports differ
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
- }
-
- @Test
- public void testResolvePathForRemoteActor() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(
- ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
}
@Test
- public void testResolvePathForLocalActor() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
+ public void testClientDispatcherIsGlobalDispatcher() {
+ ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
- String actual = actorContext.resolvePath(
- "akka://system/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka://system/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
+ assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
}
@Test
- public void testResolvePathForRemoteActorWithProperRemoteAddress() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
+ public void testClientDispatcherIsNotGlobalDispatcher() {
+ ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
+ ConfigFactory.load("application-with-custom-dispatchers.conf"));
- String actual = actorContext.resolvePath(
- "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
- "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+ ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
- String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
+ assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
- assertEquals(expected, actual);
+ actorSystem.terminate();
}
@Test
- public void testRateLimiting(){
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
-
- // Check that the initial value is being picked up from DataStoreContext
- assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
-
- actorContext.setTxCreationLimit(1.0);
-
- assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
-
-
- StopWatch watch = new StopWatch();
-
- watch.start();
-
- actorContext.acquireTxCreationPermit();
- actorContext.acquireTxCreationPermit();
- actorContext.acquireTxCreationPermit();
-
- watch.stop();
+ public void testSetDatastoreContext() {
+ new TestKit(getSystem()) {
+ {
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
+ .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
+ new PrimaryShardInfoFutureCache());
- assertTrue("did not take as much time as expected", watch.getTime() > 1000);
- }
+ assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getTransactionCommitOperationTimeout", 7,
+ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
- @Test
- public void testClientDispatcherIsGlobalDispatcher(){
+ DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
+ .shardTransactionCommitTimeoutInSeconds(8).build();
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+ DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
+ Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ actorContext.setDatastoreContext(mockContextFactory);
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
- assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+ Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+ assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getTransactionCommitOperationTimeout", 8,
+ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+ }
+ };
}
@Test
- public void testClientDispatcherIsNotGlobalDispatcher(){
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
- ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+ ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
- ActorContext actorContext =
- new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
-
- assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+ .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+ .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
- actorSystem.shutdown();
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
+ final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
+ ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+ return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
+ }
+ };
- }
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
- @Test
- public void testSetDatastoreContext() {
- new JavaTestKit(getSystem()) {{
- ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().
- operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+ assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
- assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
- assertEquals("getTransactionCommitOperationTimeout", 7,
- actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
- DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
- shardTransactionCommitTimeoutInSeconds(8).build();
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
- actorContext.setDatastoreContext(newContext);
+ assertEquals(cachedInfo, actual);
- expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
- Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
- assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
- assertEquals("getTransactionCommitOperationTimeout", 8,
- actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
- }};
+ assertNull(cached);
}
@Test
- public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
-
- TestActorRef<MessageCollectorActor> shardManager =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+ public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+ ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+ .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+ .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
- @Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
- }
- };
-
-
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
- ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
-
- assertNotNull(actual);
+ final DataTree mockDataTree = Mockito.mock(DataTree.class);
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
+ ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+ return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
+ }
+ };
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
- ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+ assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+ assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
- assertEquals(cachedSelection, actual);
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
- // Wait for 200 Milliseconds. The cached entry should have been removed.
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ assertEquals(cachedInfo, actual);
- cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
- assertNull(cached);
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ assertNull(cached);
}
@Test
public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
-
- TestActorRef<MessageCollectorActor> shardManager =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
- @Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryNotFound("foobar"));
- }
- };
-
-
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
-
- try {
- Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
- fail("Expected PrimaryNotFoundException");
- } catch(PrimaryNotFoundException e){
-
- }
-
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
-
- assertNull(cached);
-
+ testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
}
@Test
public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+ testFindPrimaryExceptions(new NotInitializedException("not initialized"));
+ }
- TestActorRef<MessageCollectorActor> shardManager =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
- @Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new ActorNotInitialized());
- }
- };
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+ .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+ .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+ return Futures.successful(expectedException);
+ }
+ };
- try {
- Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
- fail("Expected NotInitializedException");
- } catch(NotInitializedException e){
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected" + expectedException.getClass().toString());
+ } catch (Exception e) {
+ if (!expectedException.getClass().isInstance(e)) {
+ fail("Expected Exception of type " + expectedException.getClass().toString());
}
+ }
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
-
- assertNull(cached);
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ assertNull(cached);
}
+ @Test
+ public void testBroadcast() {
+ new TestKit(getSystem()) {
+ {
+ ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
+ ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
+
+ TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
+ MockShardManager.props());
+ MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+ shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
+ shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
+ shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+ Configuration mockConfig = mock(Configuration.class);
+ doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
+ .getAllShardNames();
+
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mockConfig,
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
+ new PrimaryShardInfoFutureCache());
+
+ actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
+
+ MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
+ MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
+ }
+ };
+ }
}