1 package org.opendaylight.controller.cluster.datastore.utils;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSelection;
5 import akka.actor.ActorSystem;
6 import akka.actor.Props;
7 import akka.actor.UntypedActor;
8 import akka.japi.Creator;
9 import akka.testkit.JavaTestKit;
10 import com.google.common.base.Optional;
11 import org.junit.Test;
12 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
13 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
14 import org.opendaylight.controller.cluster.datastore.Configuration;
15 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
16 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
17 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
18 import scala.concurrent.Await;
19 import scala.concurrent.Future;
20 import scala.concurrent.duration.Duration;
22 import java.util.concurrent.TimeUnit;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.Mockito.mock;
28 public class ActorContextTest extends AbstractActorTest{
30 public void testResolvePathForRemoteActor(){
31 ActorContext actorContext =
32 new ActorContext(mock(ActorSystem.class), mock(ActorRef.class),mock(
33 ClusterWrapper.class),
34 mock(Configuration.class));
36 String actual = actorContext.resolvePath(
37 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
38 "akka://system/user/shardmanager/shard/transaction");
40 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
42 assertEquals(expected, actual);
46 public void testResolvePathForLocalActor(){
47 ActorContext actorContext =
48 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
49 mock(Configuration.class));
51 String actual = actorContext.resolvePath(
52 "akka://system/user/shardmanager/shard",
53 "akka://system/user/shardmanager/shard/transaction");
55 String expected = "akka://system/user/shardmanager/shard/transaction";
57 assertEquals(expected, actual);
59 System.out.println(actorContext
60 .actorFor("akka://system/user/shardmanager/shard/transaction"));
64 private static class MockShardManager extends UntypedActor {
66 private final boolean found;
67 private final ActorRef actorRef;
69 private MockShardManager(boolean found, ActorRef actorRef){
72 this.actorRef = actorRef;
75 @Override public void onReceive(Object message) throws Exception {
77 getSender().tell(new LocalShardFound(actorRef), getSelf());
79 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
83 private static Props props(final boolean found, final ActorRef actorRef){
84 return Props.create(new MockShardManagerCreator(found, actorRef) );
87 @SuppressWarnings("serial")
88 private static class MockShardManagerCreator implements Creator<MockShardManager> {
90 final ActorRef actorRef;
92 MockShardManagerCreator(boolean found, ActorRef actorRef) {
94 this.actorRef = actorRef;
98 public MockShardManager create() throws Exception {
99 return new MockShardManager(found, actorRef);
105 public void testFindLocalShardWithShardFound(){
106 new JavaTestKit(getSystem()) {{
108 new Within(duration("1 seconds")) {
110 protected void run() {
112 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
114 ActorRef shardManagerActorRef = getSystem()
115 .actorOf(MockShardManager.props(true, shardActorRef));
117 ActorContext actorContext =
118 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
119 mock(Configuration.class));
121 Optional<ActorRef> out = actorContext.findLocalShard("default");
123 assertEquals(shardActorRef, out.get());
134 public void testFindLocalShardWithShardNotFound(){
135 new JavaTestKit(getSystem()) {{
137 new Within(duration("1 seconds")) {
139 protected void run() {
141 ActorRef shardManagerActorRef = getSystem()
142 .actorOf(MockShardManager.props(false, null));
144 ActorContext actorContext =
145 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
146 mock(Configuration.class));
148 Optional<ActorRef> out = actorContext.findLocalShard("default");
149 assertTrue(!out.isPresent());
158 public void testExecuteRemoteOperation() {
159 new JavaTestKit(getSystem()) {{
161 new Within(duration("3 seconds")) {
163 protected void run() {
165 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
167 ActorRef shardManagerActorRef = getSystem()
168 .actorOf(MockShardManager.props(true, shardActorRef));
170 ActorContext actorContext =
171 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
172 mock(Configuration.class));
174 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
176 Object out = actorContext.executeOperation(actor, "hello");
178 assertEquals("hello", out);
187 public void testExecuteRemoteOperationAsync() {
188 new JavaTestKit(getSystem()) {{
190 new Within(duration("3 seconds")) {
192 protected void run() {
194 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
196 ActorRef shardManagerActorRef = getSystem()
197 .actorOf(MockShardManager.props(true, shardActorRef));
199 ActorContext actorContext =
200 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
201 mock(Configuration.class));
203 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
205 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
208 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
209 assertEquals("Result", "hello", result);
210 } catch(Exception e) {
211 throw new AssertionError(e);