1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertTrue;
5 import static org.mockito.Mockito.doReturn;
6 import static org.mockito.Mockito.mock;
7 import akka.actor.ActorRef;
8 import akka.actor.ActorSelection;
9 import akka.actor.Address;
10 import akka.actor.Props;
11 import akka.actor.UntypedActor;
12 import akka.japi.Creator;
13 import akka.testkit.JavaTestKit;
14 import com.google.common.base.Optional;
15 import java.util.concurrent.TimeUnit;
16 import org.apache.commons.lang.time.StopWatch;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
19 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
20 import org.opendaylight.controller.cluster.datastore.Configuration;
21 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
25 import scala.concurrent.Await;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.Duration;
29 public class ActorContextTest extends AbstractActorTest{
31 private static class MockShardManager extends UntypedActor {
33 private final boolean found;
34 private final ActorRef actorRef;
36 private MockShardManager(boolean found, ActorRef actorRef){
39 this.actorRef = actorRef;
42 @Override public void onReceive(Object message) throws Exception {
44 getSender().tell(new LocalShardFound(actorRef), getSelf());
46 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
50 private static Props props(final boolean found, final ActorRef actorRef){
51 return Props.create(new MockShardManagerCreator(found, actorRef) );
54 @SuppressWarnings("serial")
55 private static class MockShardManagerCreator implements Creator<MockShardManager> {
57 final ActorRef actorRef;
59 MockShardManagerCreator(boolean found, ActorRef actorRef) {
61 this.actorRef = actorRef;
65 public MockShardManager create() throws Exception {
66 return new MockShardManager(found, actorRef);
72 public void testFindLocalShardWithShardFound(){
73 new JavaTestKit(getSystem()) {{
75 new Within(duration("1 seconds")) {
77 protected void run() {
79 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
81 ActorRef shardManagerActorRef = getSystem()
82 .actorOf(MockShardManager.props(true, shardActorRef));
84 ActorContext actorContext =
85 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
86 mock(Configuration.class));
88 Optional<ActorRef> out = actorContext.findLocalShard("default");
90 assertEquals(shardActorRef, out.get());
101 public void testFindLocalShardWithShardNotFound(){
102 new JavaTestKit(getSystem()) {{
103 ActorRef shardManagerActorRef = getSystem()
104 .actorOf(MockShardManager.props(false, null));
106 ActorContext actorContext =
107 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
108 mock(Configuration.class));
110 Optional<ActorRef> out = actorContext.findLocalShard("default");
111 assertTrue(!out.isPresent());
117 public void testExecuteRemoteOperation() {
118 new JavaTestKit(getSystem()) {{
119 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
121 ActorRef shardManagerActorRef = getSystem()
122 .actorOf(MockShardManager.props(true, shardActorRef));
124 ActorContext actorContext =
125 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
126 mock(Configuration.class));
128 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
130 Object out = actorContext.executeOperation(actor, "hello");
132 assertEquals("hello", out);
137 public void testExecuteRemoteOperationAsync() {
138 new JavaTestKit(getSystem()) {{
139 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
141 ActorRef shardManagerActorRef = getSystem()
142 .actorOf(MockShardManager.props(true, shardActorRef));
144 ActorContext actorContext =
145 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
146 mock(Configuration.class));
148 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
150 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
153 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
154 assertEquals("Result", "hello", result);
155 } catch(Exception e) {
156 throw new AssertionError(e);
162 public void testIsPathLocal() {
163 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
164 ActorContext actorContext = null;
166 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
167 assertEquals(false, actorContext.isPathLocal(null));
168 assertEquals(false, actorContext.isPathLocal(""));
170 clusterWrapper.setSelfAddress(null);
171 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
172 assertEquals(false, actorContext.isPathLocal(""));
174 // even if the path is in local format, match the primary path (first 3 elements) and return true
175 clusterWrapper.setSelfAddress(new Address("akka", "test"));
176 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
177 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
179 clusterWrapper.setSelfAddress(new Address("akka", "test"));
180 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
181 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
183 clusterWrapper.setSelfAddress(new Address("akka", "test"));
184 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
185 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
187 // self address of remote format,but Tx path local format.
188 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
189 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
190 assertEquals(true, actorContext.isPathLocal(
191 "akka://system/user/shardmanager/shard/transaction"));
193 // self address of local format,but Tx path remote format.
194 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
195 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
196 assertEquals(false, actorContext.isPathLocal(
197 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
199 //local path but not same
200 clusterWrapper.setSelfAddress(new Address("akka", "test"));
201 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
202 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
205 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
206 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
207 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
209 // forward-slash missing in address
210 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
211 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
212 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
215 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
216 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
217 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
220 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
221 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
222 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
226 public void testResolvePathForRemoteActor() {
227 ActorContext actorContext =
228 new ActorContext(getSystem(), mock(ActorRef.class), mock(
229 ClusterWrapper.class),
230 mock(Configuration.class));
232 String actual = actorContext.resolvePath(
233 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
234 "akka://system/user/shardmanager/shard/transaction");
236 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
238 assertEquals(expected, actual);
242 public void testResolvePathForLocalActor() {
243 ActorContext actorContext =
244 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
245 mock(Configuration.class));
247 String actual = actorContext.resolvePath(
248 "akka://system/user/shardmanager/shard",
249 "akka://system/user/shardmanager/shard/transaction");
251 String expected = "akka://system/user/shardmanager/shard/transaction";
253 assertEquals(expected, actual);
257 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
258 ActorContext actorContext =
259 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
260 mock(Configuration.class));
262 String actual = actorContext.resolvePath(
263 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
264 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
266 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
268 assertEquals(expected, actual);
272 public void testRateLimiting(){
273 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
275 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
277 ActorContext actorContext =
278 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
279 mock(Configuration.class), mockDataStoreContext, "config");
281 // Check that the initial value is being picked up from DataStoreContext
282 assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
284 actorContext.setTxCreationLimit(1.0);
286 assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
289 StopWatch watch = new StopWatch();
293 actorContext.acquireTxCreationPermit();
294 actorContext.acquireTxCreationPermit();
295 actorContext.acquireTxCreationPermit();
299 assertTrue("did not take as much time as expected", watch.getTime() > 1000);