package org.opendaylight.controller.cluster.datastore.utils;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import akka.testkit.JavaTestKit;
+import com.google.common.base.Optional;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
-
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class ActorContextTest extends AbstractActorTest{
+
+ private static class MockShardManager extends UntypedActor {
+
+ private final boolean found;
+ private final ActorRef actorRef;
+
+ private MockShardManager(boolean found, ActorRef actorRef){
+
+ this.found = found;
+ this.actorRef = actorRef;
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(found){
+ getSender().tell(new LocalShardFound(actorRef), getSelf());
+ } else {
+ getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
+ }
+ }
+
+ private static Props props(final boolean found, final ActorRef actorRef){
+ return Props.create(new MockShardManagerCreator(found, actorRef) );
+ }
+
+ @SuppressWarnings("serial")
+ private static class MockShardManagerCreator implements Creator<MockShardManager> {
+ final boolean found;
+ final ActorRef actorRef;
+
+ MockShardManagerCreator(boolean found, ActorRef actorRef) {
+ this.found = found;
+ this.actorRef = actorRef;
+ }
+
+ @Override
+ public MockShardManager create() throws Exception {
+ return new MockShardManager(found, actorRef);
+ }
+ }
+ }
+
@Test
- public void testResolvePathForRemoteActor(){
- ActorContext actorContext =
- new ActorContext(mock(ActorSystem.class), mock(ActorRef.class),mock(
- ClusterWrapper.class),
- mock(Configuration.class));
+ public void testFindLocalShardWithShardFound(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
+
+ assertEquals(shardActorRef, out.get());
+
+
+ expectNoMsg();
+ }
+ };
+ }};
+
+ }
+
+ @Test
+ public void testFindLocalShardWithShardNotFound(){
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(false, null));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
+ assertTrue(!out.isPresent());
+ }};
+
+ }
+
+ @Test
+ public void testExecuteRemoteOperation() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
- String actual = actorContext.resolvePath(
- "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
+ Object out = actorContext.executeOperation(actor, "hello");
- assertEquals(expected, actual);
+ assertEquals("hello", out);
+ }};
}
@Test
- public void testResolvePathForLocalActor(){
+ public void testExecuteRemoteOperationAsync() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+ Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+
+ try {
+ Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+ assertEquals("Result", "hello", result);
+ } catch(Exception e) {
+ throw new AssertionError(e);
+ }
+ }};
+ }
+
+ @Test
+ public void testIsLocalPath() {
+ MockClusterWrapper clusterWrapper = new MockClusterWrapper();
ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
+ new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+
+ clusterWrapper.setSelfAddress("");
+ assertEquals(false, actorContext.isLocalPath(null));
+ assertEquals(false, actorContext.isLocalPath(""));
+
+ clusterWrapper.setSelfAddress(null);
+ assertEquals(false, actorContext.isLocalPath(""));
+
+ clusterWrapper.setSelfAddress("akka://test/user/$b");
+ assertEquals(false, actorContext.isLocalPath("akka://test/user/$a"));
- String actual = actorContext.resolvePath(
- "akka://system/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
+ clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550/");
+ assertEquals(true, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
- String expected = "akka://system/user/shardmanager/shard/transaction";
+ clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550");
+ assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
- assertEquals(expected, actual);
+ clusterWrapper.setSelfAddress("akka.tcp://system@128.0.0.1:2550/");
+ assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
- System.out.println(actorContext
- .actorFor("akka://system/user/shardmanager/shard/transaction"));
+ clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
+ assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
}
}