fd41c49390b484fd0d4343befa2f920d542e2f74
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.doReturn;
7 import static org.mockito.Mockito.mock;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.ActorSystem;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.japi.Creator;
15 import akka.testkit.JavaTestKit;
16 import com.google.common.base.Optional;
17 import com.typesafe.config.ConfigFactory;
18 import java.util.concurrent.TimeUnit;
19 import org.apache.commons.lang.time.StopWatch;
20 import org.junit.Assert;
21 import org.junit.Test;
22 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
23 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
24 import org.opendaylight.controller.cluster.datastore.Configuration;
25 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
26 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
28 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
29 import scala.concurrent.Await;
30 import scala.concurrent.Future;
31 import scala.concurrent.duration.Duration;
32
33 public class ActorContextTest extends AbstractActorTest{
34
35     private static class MockShardManager extends UntypedActor {
36
37         private final boolean found;
38         private final ActorRef actorRef;
39
40         private MockShardManager(boolean found, ActorRef actorRef){
41
42             this.found = found;
43             this.actorRef = actorRef;
44         }
45
46         @Override public void onReceive(Object message) throws Exception {
47             if(found){
48                 getSender().tell(new LocalShardFound(actorRef), getSelf());
49             } else {
50                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
51             }
52         }
53
54         private static Props props(final boolean found, final ActorRef actorRef){
55             return Props.create(new MockShardManagerCreator(found, actorRef) );
56         }
57
58         @SuppressWarnings("serial")
59         private static class MockShardManagerCreator implements Creator<MockShardManager> {
60             final boolean found;
61             final ActorRef actorRef;
62
63             MockShardManagerCreator(boolean found, ActorRef actorRef) {
64                 this.found = found;
65                 this.actorRef = actorRef;
66             }
67
68             @Override
69             public MockShardManager create() throws Exception {
70                 return new MockShardManager(found, actorRef);
71             }
72         }
73     }
74
75     @Test
76     public void testFindLocalShardWithShardFound(){
77         new JavaTestKit(getSystem()) {{
78
79             new Within(duration("1 seconds")) {
80                 @Override
81                 protected void run() {
82
83                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
84
85                     ActorRef shardManagerActorRef = getSystem()
86                         .actorOf(MockShardManager.props(true, shardActorRef));
87
88                     ActorContext actorContext =
89                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
90                             mock(Configuration.class));
91
92                     Optional<ActorRef> out = actorContext.findLocalShard("default");
93
94                     assertEquals(shardActorRef, out.get());
95
96
97                     expectNoMsg();
98                 }
99             };
100         }};
101
102     }
103
104     @Test
105     public void testFindLocalShardWithShardNotFound(){
106         new JavaTestKit(getSystem()) {{
107             ActorRef shardManagerActorRef = getSystem()
108                     .actorOf(MockShardManager.props(false, null));
109
110             ActorContext actorContext =
111                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
112                             mock(Configuration.class));
113
114             Optional<ActorRef> out = actorContext.findLocalShard("default");
115             assertTrue(!out.isPresent());
116         }};
117
118     }
119
120     @Test
121     public void testExecuteRemoteOperation() {
122         new JavaTestKit(getSystem()) {{
123             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
124
125             ActorRef shardManagerActorRef = getSystem()
126                     .actorOf(MockShardManager.props(true, shardActorRef));
127
128             ActorContext actorContext =
129                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
130                             mock(Configuration.class));
131
132             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
133
134             Object out = actorContext.executeOperation(actor, "hello");
135
136             assertEquals("hello", out);
137         }};
138     }
139
140     @Test
141     public void testExecuteRemoteOperationAsync() {
142         new JavaTestKit(getSystem()) {{
143             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
144
145             ActorRef shardManagerActorRef = getSystem()
146                     .actorOf(MockShardManager.props(true, shardActorRef));
147
148             ActorContext actorContext =
149                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
150                             mock(Configuration.class));
151
152             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
153
154             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
155
156             try {
157                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
158                 assertEquals("Result", "hello", result);
159             } catch(Exception e) {
160                 throw new AssertionError(e);
161             }
162         }};
163     }
164
165     @Test
166     public void testIsPathLocal() {
167         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
168         ActorContext actorContext = null;
169
170         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
171         assertEquals(false, actorContext.isPathLocal(null));
172         assertEquals(false, actorContext.isPathLocal(""));
173
174         clusterWrapper.setSelfAddress(null);
175         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
176         assertEquals(false, actorContext.isPathLocal(""));
177
178         // even if the path is in local format, match the primary path (first 3 elements) and return true
179         clusterWrapper.setSelfAddress(new Address("akka", "test"));
180         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
181         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
182
183         clusterWrapper.setSelfAddress(new Address("akka", "test"));
184         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
185         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
186
187         clusterWrapper.setSelfAddress(new Address("akka", "test"));
188         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
189         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
190
191         // self address of remote format,but Tx path local format.
192         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
193         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
194         assertEquals(true, actorContext.isPathLocal(
195             "akka://system/user/shardmanager/shard/transaction"));
196
197         // self address of local format,but Tx path remote format.
198         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
199         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
200         assertEquals(false, actorContext.isPathLocal(
201             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
202
203         //local path but not same
204         clusterWrapper.setSelfAddress(new Address("akka", "test"));
205         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
206         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
207
208         //ip and port same
209         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
210         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
211         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
212
213         // forward-slash missing in address
214         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
215         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
216         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
217
218         //ips differ
219         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
220         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
221         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
222
223         //ports differ
224         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
225         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
226         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
227     }
228
229     @Test
230     public void testResolvePathForRemoteActor() {
231         ActorContext actorContext =
232                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
233                         ClusterWrapper.class),
234                         mock(Configuration.class));
235
236         String actual = actorContext.resolvePath(
237                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
238                 "akka://system/user/shardmanager/shard/transaction");
239
240         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
241
242         assertEquals(expected, actual);
243     }
244
245     @Test
246     public void testResolvePathForLocalActor() {
247         ActorContext actorContext =
248                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
249                         mock(Configuration.class));
250
251         String actual = actorContext.resolvePath(
252                 "akka://system/user/shardmanager/shard",
253                 "akka://system/user/shardmanager/shard/transaction");
254
255         String expected = "akka://system/user/shardmanager/shard/transaction";
256
257         assertEquals(expected, actual);
258     }
259
260     @Test
261     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
262         ActorContext actorContext =
263                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
264                         mock(Configuration.class));
265
266         String actual = actorContext.resolvePath(
267                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
268                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
269
270         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
271
272         assertEquals(expected, actual);
273     }
274
275     @Test
276     public void testRateLimiting(){
277         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
278
279         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
280         doReturn("config").when(mockDataStoreContext).getDataStoreType();
281
282         ActorContext actorContext =
283                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
284                         mock(Configuration.class), mockDataStoreContext);
285
286         // Check that the initial value is being picked up from DataStoreContext
287         assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
288
289         actorContext.setTxCreationLimit(1.0);
290
291         assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
292
293
294         StopWatch watch = new StopWatch();
295
296         watch.start();
297
298         actorContext.acquireTxCreationPermit();
299         actorContext.acquireTxCreationPermit();
300         actorContext.acquireTxCreationPermit();
301
302         watch.stop();
303
304         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
305     }
306
307     @Test
308     public void testClientDispatcherIsGlobalDispatcher(){
309
310         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
311
312         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
313         doReturn("config").when(mockDataStoreContext).getDataStoreType();
314
315         ActorContext actorContext =
316                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
317                         mock(Configuration.class), mockDataStoreContext);
318
319         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
320
321     }
322
323     @Test
324     public void testClientDispatcherIsNotGlobalDispatcher(){
325
326         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
327
328         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
329         doReturn("config").when(mockDataStoreContext).getDataStoreType();
330
331         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
332
333         ActorContext actorContext =
334                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
335                         mock(Configuration.class), mockDataStoreContext);
336
337         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
338
339         actorSystem.shutdown();
340
341     }
342
343     @Test
344     public void testSetDatastoreContext() {
345         new JavaTestKit(getSystem()) {{
346             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
347                             mock(Configuration.class), DatastoreContext.newBuilder().
348                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
349
350             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
351             assertEquals("getTransactionCommitOperationTimeout", 7,
352                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
353
354             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
355                     shardTransactionCommitTimeoutInSeconds(8).build();
356
357             actorContext.setDatastoreContext(newContext);
358
359             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
360
361             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
362
363             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
364             assertEquals("getTransactionCommitOperationTimeout", 8,
365                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
366         }};
367     }
368 }