1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.doReturn;
7 import static org.mockito.Mockito.mock;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.ActorSystem;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.japi.Creator;
15 import akka.testkit.JavaTestKit;
16 import com.google.common.base.Optional;
17 import com.typesafe.config.ConfigFactory;
18 import java.util.concurrent.TimeUnit;
19 import org.apache.commons.lang.time.StopWatch;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
22 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
23 import org.opendaylight.controller.cluster.datastore.Configuration;
24 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
25 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
26 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
28 import scala.concurrent.Await;
29 import scala.concurrent.Future;
30 import scala.concurrent.duration.Duration;
32 public class ActorContextTest extends AbstractActorTest{
34 private static class MockShardManager extends UntypedActor {
36 private final boolean found;
37 private final ActorRef actorRef;
39 private MockShardManager(boolean found, ActorRef actorRef){
42 this.actorRef = actorRef;
45 @Override public void onReceive(Object message) throws Exception {
47 getSender().tell(new LocalShardFound(actorRef), getSelf());
49 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
53 private static Props props(final boolean found, final ActorRef actorRef){
54 return Props.create(new MockShardManagerCreator(found, actorRef) );
57 @SuppressWarnings("serial")
58 private static class MockShardManagerCreator implements Creator<MockShardManager> {
60 final ActorRef actorRef;
62 MockShardManagerCreator(boolean found, ActorRef actorRef) {
64 this.actorRef = actorRef;
68 public MockShardManager create() throws Exception {
69 return new MockShardManager(found, actorRef);
75 public void testFindLocalShardWithShardFound(){
76 new JavaTestKit(getSystem()) {{
78 new Within(duration("1 seconds")) {
80 protected void run() {
82 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
84 ActorRef shardManagerActorRef = getSystem()
85 .actorOf(MockShardManager.props(true, shardActorRef));
87 ActorContext actorContext =
88 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
89 mock(Configuration.class));
91 Optional<ActorRef> out = actorContext.findLocalShard("default");
93 assertEquals(shardActorRef, out.get());
104 public void testFindLocalShardWithShardNotFound(){
105 new JavaTestKit(getSystem()) {{
106 ActorRef shardManagerActorRef = getSystem()
107 .actorOf(MockShardManager.props(false, null));
109 ActorContext actorContext =
110 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
111 mock(Configuration.class));
113 Optional<ActorRef> out = actorContext.findLocalShard("default");
114 assertTrue(!out.isPresent());
120 public void testExecuteRemoteOperation() {
121 new JavaTestKit(getSystem()) {{
122 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
124 ActorRef shardManagerActorRef = getSystem()
125 .actorOf(MockShardManager.props(true, shardActorRef));
127 ActorContext actorContext =
128 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
129 mock(Configuration.class));
131 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
133 Object out = actorContext.executeOperation(actor, "hello");
135 assertEquals("hello", out);
140 public void testExecuteRemoteOperationAsync() {
141 new JavaTestKit(getSystem()) {{
142 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
144 ActorRef shardManagerActorRef = getSystem()
145 .actorOf(MockShardManager.props(true, shardActorRef));
147 ActorContext actorContext =
148 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
149 mock(Configuration.class));
151 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
153 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
156 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
157 assertEquals("Result", "hello", result);
158 } catch(Exception e) {
159 throw new AssertionError(e);
165 public void testIsPathLocal() {
166 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
167 ActorContext actorContext = null;
169 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
170 assertEquals(false, actorContext.isPathLocal(null));
171 assertEquals(false, actorContext.isPathLocal(""));
173 clusterWrapper.setSelfAddress(null);
174 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
175 assertEquals(false, actorContext.isPathLocal(""));
177 // even if the path is in local format, match the primary path (first 3 elements) and return true
178 clusterWrapper.setSelfAddress(new Address("akka", "test"));
179 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
180 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
182 clusterWrapper.setSelfAddress(new Address("akka", "test"));
183 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
184 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
186 clusterWrapper.setSelfAddress(new Address("akka", "test"));
187 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
188 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
190 // self address of remote format,but Tx path local format.
191 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
192 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
193 assertEquals(true, actorContext.isPathLocal(
194 "akka://system/user/shardmanager/shard/transaction"));
196 // self address of local format,but Tx path remote format.
197 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
198 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
199 assertEquals(false, actorContext.isPathLocal(
200 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
202 //local path but not same
203 clusterWrapper.setSelfAddress(new Address("akka", "test"));
204 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
205 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
208 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
209 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
210 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
212 // forward-slash missing in address
213 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
214 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
215 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
218 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
219 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
220 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
223 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
224 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
225 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
229 public void testResolvePathForRemoteActor() {
230 ActorContext actorContext =
231 new ActorContext(getSystem(), mock(ActorRef.class), mock(
232 ClusterWrapper.class),
233 mock(Configuration.class));
235 String actual = actorContext.resolvePath(
236 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
237 "akka://system/user/shardmanager/shard/transaction");
239 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
241 assertEquals(expected, actual);
245 public void testResolvePathForLocalActor() {
246 ActorContext actorContext =
247 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
248 mock(Configuration.class));
250 String actual = actorContext.resolvePath(
251 "akka://system/user/shardmanager/shard",
252 "akka://system/user/shardmanager/shard/transaction");
254 String expected = "akka://system/user/shardmanager/shard/transaction";
256 assertEquals(expected, actual);
260 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
261 ActorContext actorContext =
262 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
263 mock(Configuration.class));
265 String actual = actorContext.resolvePath(
266 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
267 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
269 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
271 assertEquals(expected, actual);
275 public void testRateLimiting(){
276 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
278 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
279 doReturn("config").when(mockDataStoreContext).getDataStoreType();
281 ActorContext actorContext =
282 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
283 mock(Configuration.class), mockDataStoreContext);
285 // Check that the initial value is being picked up from DataStoreContext
286 assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
288 actorContext.setTxCreationLimit(1.0);
290 assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
293 StopWatch watch = new StopWatch();
297 actorContext.acquireTxCreationPermit();
298 actorContext.acquireTxCreationPermit();
299 actorContext.acquireTxCreationPermit();
303 assertTrue("did not take as much time as expected", watch.getTime() > 1000);
307 public void testClientDispatcherIsGlobalDispatcher(){
309 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
311 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
312 doReturn("config").when(mockDataStoreContext).getDataStoreType();
314 ActorContext actorContext =
315 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
316 mock(Configuration.class), mockDataStoreContext);
318 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
323 public void testClientDispatcherIsNotGlobalDispatcher(){
325 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
327 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
328 doReturn("config").when(mockDataStoreContext).getDataStoreType();
330 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
332 ActorContext actorContext =
333 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
334 mock(Configuration.class), mockDataStoreContext);
336 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
338 actorSystem.shutdown();