1 package org.opendaylight.controller.cluster.datastore.utils;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSelection;
5 import akka.actor.ActorSystem;
6 import akka.actor.Props;
7 import akka.actor.UntypedActor;
8 import akka.japi.Creator;
9 import akka.testkit.JavaTestKit;
10 import com.google.common.base.Optional;
11 import org.junit.Test;
12 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
13 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
14 import org.opendaylight.controller.cluster.datastore.Configuration;
15 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
16 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
17 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
18 import scala.concurrent.Await;
19 import scala.concurrent.Future;
20 import scala.concurrent.duration.Duration;
22 import java.util.concurrent.TimeUnit;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.Mockito.mock;
28 public class ActorContextTest extends AbstractActorTest{
30 private static class MockShardManager extends UntypedActor {
32 private final boolean found;
33 private final ActorRef actorRef;
35 private MockShardManager(boolean found, ActorRef actorRef){
38 this.actorRef = actorRef;
41 @Override public void onReceive(Object message) throws Exception {
43 getSender().tell(new LocalShardFound(actorRef), getSelf());
45 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
49 private static Props props(final boolean found, final ActorRef actorRef){
50 return Props.create(new MockShardManagerCreator(found, actorRef) );
53 @SuppressWarnings("serial")
54 private static class MockShardManagerCreator implements Creator<MockShardManager> {
56 final ActorRef actorRef;
58 MockShardManagerCreator(boolean found, ActorRef actorRef) {
60 this.actorRef = actorRef;
64 public MockShardManager create() throws Exception {
65 return new MockShardManager(found, actorRef);
71 public void testFindLocalShardWithShardFound(){
72 new JavaTestKit(getSystem()) {{
74 new Within(duration("1 seconds")) {
76 protected void run() {
78 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
80 ActorRef shardManagerActorRef = getSystem()
81 .actorOf(MockShardManager.props(true, shardActorRef));
83 ActorContext actorContext =
84 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
85 mock(Configuration.class));
87 Optional<ActorRef> out = actorContext.findLocalShard("default");
89 assertEquals(shardActorRef, out.get());
100 public void testFindLocalShardWithShardNotFound(){
101 new JavaTestKit(getSystem()) {{
102 ActorRef shardManagerActorRef = getSystem()
103 .actorOf(MockShardManager.props(false, null));
105 ActorContext actorContext =
106 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
107 mock(Configuration.class));
109 Optional<ActorRef> out = actorContext.findLocalShard("default");
110 assertTrue(!out.isPresent());
116 public void testExecuteRemoteOperation() {
117 new JavaTestKit(getSystem()) {{
118 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
120 ActorRef shardManagerActorRef = getSystem()
121 .actorOf(MockShardManager.props(true, shardActorRef));
123 ActorContext actorContext =
124 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
125 mock(Configuration.class));
127 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
129 Object out = actorContext.executeOperation(actor, "hello");
131 assertEquals("hello", out);
136 public void testExecuteRemoteOperationAsync() {
137 new JavaTestKit(getSystem()) {{
138 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
140 ActorRef shardManagerActorRef = getSystem()
141 .actorOf(MockShardManager.props(true, shardActorRef));
143 ActorContext actorContext =
144 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
145 mock(Configuration.class));
147 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
149 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
152 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
153 assertEquals("Result", "hello", result);
154 } catch(Exception e) {
155 throw new AssertionError(e);
161 public void testIsLocalPath() {
162 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
163 ActorContext actorContext =
164 new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
166 clusterWrapper.setSelfAddress("");
167 assertEquals(false, actorContext.isLocalPath(null));
168 assertEquals(false, actorContext.isLocalPath(""));
170 clusterWrapper.setSelfAddress(null);
171 assertEquals(false, actorContext.isLocalPath(""));
173 clusterWrapper.setSelfAddress("akka://test/user/$b");
174 assertEquals(false, actorContext.isLocalPath("akka://test/user/$a"));
176 clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550/");
177 assertEquals(true, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
179 clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550");
180 assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
182 clusterWrapper.setSelfAddress("akka.tcp://system@128.0.0.1:2550/");
183 assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
185 clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
186 assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
190 public void testResolvePathForRemoteActor() {
191 ActorContext actorContext =
192 new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
193 ClusterWrapper.class),
194 mock(Configuration.class));
196 String actual = actorContext.resolvePath(
197 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
198 "akka://system/user/shardmanager/shard/transaction");
200 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
202 assertEquals(expected, actual);
206 public void testResolvePathForLocalActor() {
207 ActorContext actorContext =
208 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
209 mock(Configuration.class));
211 String actual = actorContext.resolvePath(
212 "akka://system/user/shardmanager/shard",
213 "akka://system/user/shardmanager/shard/transaction");
215 String expected = "akka://system/user/shardmanager/shard/transaction";
217 assertEquals(expected, actual);
221 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
222 ActorContext actorContext =
223 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
224 mock(Configuration.class));
226 String actual = actorContext.resolvePath(
227 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
228 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
230 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
232 assertEquals(expected, actual);