Merge "Removing { } from NormalizedNodeJsonBodyWriter"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.doReturn;
7 import static org.mockito.Mockito.mock;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.ActorSystem;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.japi.Creator;
15 import akka.testkit.JavaTestKit;
16 import com.google.common.base.Optional;
17 import com.typesafe.config.ConfigFactory;
18 import java.util.concurrent.TimeUnit;
19 import org.apache.commons.lang.time.StopWatch;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
22 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
23 import org.opendaylight.controller.cluster.datastore.Configuration;
24 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
25 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
26 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
28 import scala.concurrent.Await;
29 import scala.concurrent.Future;
30 import scala.concurrent.duration.Duration;
31
32 public class ActorContextTest extends AbstractActorTest{
33
34     private static class MockShardManager extends UntypedActor {
35
36         private final boolean found;
37         private final ActorRef actorRef;
38
39         private MockShardManager(boolean found, ActorRef actorRef){
40
41             this.found = found;
42             this.actorRef = actorRef;
43         }
44
45         @Override public void onReceive(Object message) throws Exception {
46             if(found){
47                 getSender().tell(new LocalShardFound(actorRef), getSelf());
48             } else {
49                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
50             }
51         }
52
53         private static Props props(final boolean found, final ActorRef actorRef){
54             return Props.create(new MockShardManagerCreator(found, actorRef) );
55         }
56
57         @SuppressWarnings("serial")
58         private static class MockShardManagerCreator implements Creator<MockShardManager> {
59             final boolean found;
60             final ActorRef actorRef;
61
62             MockShardManagerCreator(boolean found, ActorRef actorRef) {
63                 this.found = found;
64                 this.actorRef = actorRef;
65             }
66
67             @Override
68             public MockShardManager create() throws Exception {
69                 return new MockShardManager(found, actorRef);
70             }
71         }
72     }
73
74     @Test
75     public void testFindLocalShardWithShardFound(){
76         new JavaTestKit(getSystem()) {{
77
78             new Within(duration("1 seconds")) {
79                 @Override
80                 protected void run() {
81
82                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
83
84                     ActorRef shardManagerActorRef = getSystem()
85                         .actorOf(MockShardManager.props(true, shardActorRef));
86
87                     ActorContext actorContext =
88                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
89                             mock(Configuration.class));
90
91                     Optional<ActorRef> out = actorContext.findLocalShard("default");
92
93                     assertEquals(shardActorRef, out.get());
94
95
96                     expectNoMsg();
97                 }
98             };
99         }};
100
101     }
102
103     @Test
104     public void testFindLocalShardWithShardNotFound(){
105         new JavaTestKit(getSystem()) {{
106             ActorRef shardManagerActorRef = getSystem()
107                     .actorOf(MockShardManager.props(false, null));
108
109             ActorContext actorContext =
110                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
111                             mock(Configuration.class));
112
113             Optional<ActorRef> out = actorContext.findLocalShard("default");
114             assertTrue(!out.isPresent());
115         }};
116
117     }
118
119     @Test
120     public void testExecuteRemoteOperation() {
121         new JavaTestKit(getSystem()) {{
122             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
123
124             ActorRef shardManagerActorRef = getSystem()
125                     .actorOf(MockShardManager.props(true, shardActorRef));
126
127             ActorContext actorContext =
128                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
129                             mock(Configuration.class));
130
131             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
132
133             Object out = actorContext.executeOperation(actor, "hello");
134
135             assertEquals("hello", out);
136         }};
137     }
138
139     @Test
140     public void testExecuteRemoteOperationAsync() {
141         new JavaTestKit(getSystem()) {{
142             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
143
144             ActorRef shardManagerActorRef = getSystem()
145                     .actorOf(MockShardManager.props(true, shardActorRef));
146
147             ActorContext actorContext =
148                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
149                             mock(Configuration.class));
150
151             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
152
153             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
154
155             try {
156                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
157                 assertEquals("Result", "hello", result);
158             } catch(Exception e) {
159                 throw new AssertionError(e);
160             }
161         }};
162     }
163
164     @Test
165     public void testIsPathLocal() {
166         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
167         ActorContext actorContext = null;
168
169         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
170         assertEquals(false, actorContext.isPathLocal(null));
171         assertEquals(false, actorContext.isPathLocal(""));
172
173         clusterWrapper.setSelfAddress(null);
174         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
175         assertEquals(false, actorContext.isPathLocal(""));
176
177         // even if the path is in local format, match the primary path (first 3 elements) and return true
178         clusterWrapper.setSelfAddress(new Address("akka", "test"));
179         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
180         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
181
182         clusterWrapper.setSelfAddress(new Address("akka", "test"));
183         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
184         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
185
186         clusterWrapper.setSelfAddress(new Address("akka", "test"));
187         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
188         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
189
190         // self address of remote format,but Tx path local format.
191         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
192         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
193         assertEquals(true, actorContext.isPathLocal(
194             "akka://system/user/shardmanager/shard/transaction"));
195
196         // self address of local format,but Tx path remote format.
197         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
198         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
199         assertEquals(false, actorContext.isPathLocal(
200             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
201
202         //local path but not same
203         clusterWrapper.setSelfAddress(new Address("akka", "test"));
204         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
205         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
206
207         //ip and port same
208         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
209         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
210         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
211
212         // forward-slash missing in address
213         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
214         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
215         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
216
217         //ips differ
218         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
219         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
220         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
221
222         //ports differ
223         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
224         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
225         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
226     }
227
228     @Test
229     public void testResolvePathForRemoteActor() {
230         ActorContext actorContext =
231                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
232                         ClusterWrapper.class),
233                         mock(Configuration.class));
234
235         String actual = actorContext.resolvePath(
236                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
237                 "akka://system/user/shardmanager/shard/transaction");
238
239         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
240
241         assertEquals(expected, actual);
242     }
243
244     @Test
245     public void testResolvePathForLocalActor() {
246         ActorContext actorContext =
247                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
248                         mock(Configuration.class));
249
250         String actual = actorContext.resolvePath(
251                 "akka://system/user/shardmanager/shard",
252                 "akka://system/user/shardmanager/shard/transaction");
253
254         String expected = "akka://system/user/shardmanager/shard/transaction";
255
256         assertEquals(expected, actual);
257     }
258
259     @Test
260     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
261         ActorContext actorContext =
262                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
263                         mock(Configuration.class));
264
265         String actual = actorContext.resolvePath(
266                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
267                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
268
269         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
270
271         assertEquals(expected, actual);
272     }
273
274     @Test
275     public void testRateLimiting(){
276         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
277
278         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
279         doReturn("config").when(mockDataStoreContext).getDataStoreType();
280
281         ActorContext actorContext =
282                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
283                         mock(Configuration.class), mockDataStoreContext);
284
285         // Check that the initial value is being picked up from DataStoreContext
286         assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
287
288         actorContext.setTxCreationLimit(1.0);
289
290         assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
291
292
293         StopWatch watch = new StopWatch();
294
295         watch.start();
296
297         actorContext.acquireTxCreationPermit();
298         actorContext.acquireTxCreationPermit();
299         actorContext.acquireTxCreationPermit();
300
301         watch.stop();
302
303         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
304     }
305
306     @Test
307     public void testClientDispatcherIsGlobalDispatcher(){
308
309         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
310
311         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
312         doReturn("config").when(mockDataStoreContext).getDataStoreType();
313
314         ActorContext actorContext =
315                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
316                         mock(Configuration.class), mockDataStoreContext);
317
318         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
319
320     }
321
322     @Test
323     public void testClientDispatcherIsNotGlobalDispatcher(){
324
325         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
326
327         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
328         doReturn("config").when(mockDataStoreContext).getDataStoreType();
329
330         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
331
332         ActorContext actorContext =
333                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
334                         mock(Configuration.class), mockDataStoreContext);
335
336         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
337
338         actorSystem.shutdown();
339
340     }
341
342 }