bc80937897c97104a7b8a17ac1d0cf9eff672143
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.mock;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.UntypedActor;
17 import akka.dispatch.Futures;
18 import akka.japi.Creator;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Arrays;
28 import java.util.Map;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang.time.StopWatch;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
41 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import scala.concurrent.Await;
49 import scala.concurrent.Future;
50 import scala.concurrent.duration.Duration;
51 import scala.concurrent.duration.FiniteDuration;
52
53 public class ActorContextTest extends AbstractActorTest{
54
55     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
56
57     private static class TestMessage {
58     }
59
60     private static class MockShardManager extends UntypedActor {
61
62         private final boolean found;
63         private final ActorRef actorRef;
64         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
65
66         private MockShardManager(boolean found, ActorRef actorRef){
67
68             this.found = found;
69             this.actorRef = actorRef;
70         }
71
72         @Override public void onReceive(Object message) throws Exception {
73             if(message instanceof FindPrimary) {
74                 FindPrimary fp = (FindPrimary)message;
75                 Object resp = findPrimaryResponses.get(fp.getShardName());
76                 if(resp == null) {
77                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
78                 } else {
79                     getSender().tell(resp, getSelf());
80                 }
81
82                 return;
83             }
84
85             if(found){
86                 getSender().tell(new LocalShardFound(actorRef), getSelf());
87             } else {
88                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
89             }
90         }
91
92         void addFindPrimaryResp(String shardName, Object resp) {
93             findPrimaryResponses.put(shardName, resp);
94         }
95
96         private static Props props(final boolean found, final ActorRef actorRef){
97             return Props.create(new MockShardManagerCreator(found, actorRef) );
98         }
99
100         private static Props props(){
101             return Props.create(new MockShardManagerCreator() );
102         }
103
104         @SuppressWarnings("serial")
105         private static class MockShardManagerCreator implements Creator<MockShardManager> {
106             final boolean found;
107             final ActorRef actorRef;
108
109             MockShardManagerCreator() {
110                 this.found = false;
111                 this.actorRef = null;
112             }
113
114             MockShardManagerCreator(boolean found, ActorRef actorRef) {
115                 this.found = found;
116                 this.actorRef = actorRef;
117             }
118
119             @Override
120             public MockShardManager create() throws Exception {
121                 return new MockShardManager(found, actorRef);
122             }
123         }
124     }
125
126     @Test
127     public void testFindLocalShardWithShardFound(){
128         new JavaTestKit(getSystem()) {{
129
130             new Within(duration("1 seconds")) {
131                 @Override
132                 protected void run() {
133
134                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
135
136                     ActorRef shardManagerActorRef = getSystem()
137                         .actorOf(MockShardManager.props(true, shardActorRef));
138
139                     ActorContext actorContext =
140                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
141                             mock(Configuration.class));
142
143                     Optional<ActorRef> out = actorContext.findLocalShard("default");
144
145                     assertEquals(shardActorRef, out.get());
146
147
148                     expectNoMsg();
149                 }
150             };
151         }};
152
153     }
154
155     @Test
156     public void testFindLocalShardWithShardNotFound(){
157         new JavaTestKit(getSystem()) {{
158             ActorRef shardManagerActorRef = getSystem()
159                     .actorOf(MockShardManager.props(false, null));
160
161             ActorContext actorContext =
162                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
163                             mock(Configuration.class));
164
165             Optional<ActorRef> out = actorContext.findLocalShard("default");
166             assertTrue(!out.isPresent());
167         }};
168
169     }
170
171     @Test
172     public void testExecuteRemoteOperation() {
173         new JavaTestKit(getSystem()) {{
174             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
175
176             ActorRef shardManagerActorRef = getSystem()
177                     .actorOf(MockShardManager.props(true, shardActorRef));
178
179             ActorContext actorContext =
180                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
181                             mock(Configuration.class));
182
183             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
184
185             Object out = actorContext.executeOperation(actor, "hello");
186
187             assertEquals("hello", out);
188         }};
189     }
190
191     @Test
192     public void testExecuteRemoteOperationAsync() {
193         new JavaTestKit(getSystem()) {{
194             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
195
196             ActorRef shardManagerActorRef = getSystem()
197                     .actorOf(MockShardManager.props(true, shardActorRef));
198
199             ActorContext actorContext =
200                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
201                             mock(Configuration.class));
202
203             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
204
205             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
206
207             try {
208                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
209                 assertEquals("Result", "hello", result);
210             } catch(Exception e) {
211                 throw new AssertionError(e);
212             }
213         }};
214     }
215
216     @Test
217     public void testIsPathLocal() {
218         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
219         ActorContext actorContext = null;
220
221         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
222         assertEquals(false, actorContext.isPathLocal(null));
223         assertEquals(false, actorContext.isPathLocal(""));
224
225         clusterWrapper.setSelfAddress(null);
226         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
227         assertEquals(false, actorContext.isPathLocal(""));
228
229         // even if the path is in local format, match the primary path (first 3 elements) and return true
230         clusterWrapper.setSelfAddress(new Address("akka", "test"));
231         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
232         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
233
234         clusterWrapper.setSelfAddress(new Address("akka", "test"));
235         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
237
238         clusterWrapper.setSelfAddress(new Address("akka", "test"));
239         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
241
242         // self address of remote format,but Tx path local format.
243         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
244         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
245         assertEquals(true, actorContext.isPathLocal(
246             "akka://system/user/shardmanager/shard/transaction"));
247
248         // self address of local format,but Tx path remote format.
249         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
250         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
251         assertEquals(false, actorContext.isPathLocal(
252             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
253
254         //local path but not same
255         clusterWrapper.setSelfAddress(new Address("akka", "test"));
256         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
257         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
258
259         //ip and port same
260         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
261         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
262         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
263
264         // forward-slash missing in address
265         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
266         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
267         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
268
269         //ips differ
270         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
271         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
272         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
273
274         //ports differ
275         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
276         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
277         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
278     }
279
280     @Test
281     public void testResolvePathForRemoteActor() {
282         ActorContext actorContext =
283                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
284                         ClusterWrapper.class),
285                         mock(Configuration.class));
286
287         String actual = actorContext.resolvePath(
288                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
289                 "akka://system/user/shardmanager/shard/transaction");
290
291         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
292
293         assertEquals(expected, actual);
294     }
295
296     @Test
297     public void testResolvePathForLocalActor() {
298         ActorContext actorContext =
299                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
300                         mock(Configuration.class));
301
302         String actual = actorContext.resolvePath(
303                 "akka://system/user/shardmanager/shard",
304                 "akka://system/user/shardmanager/shard/transaction");
305
306         String expected = "akka://system/user/shardmanager/shard/transaction";
307
308         assertEquals(expected, actual);
309     }
310
311     @Test
312     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
313         ActorContext actorContext =
314                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
315                         mock(Configuration.class));
316
317         String actual = actorContext.resolvePath(
318                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
319                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
320
321         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
322
323         assertEquals(expected, actual);
324     }
325
326     @Test
327     public void testRateLimiting(){
328         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
329                 transactionCreationInitialRateLimit(155L).build();
330
331         ActorContext actorContext =
332                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
333                         mock(Configuration.class), dataStoreContext);
334
335         // Check that the initial value is being picked up from DataStoreContext
336         assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
337
338         actorContext.setTxCreationLimit(1.0);
339
340         assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
341
342
343         StopWatch watch = new StopWatch();
344
345         watch.start();
346
347         actorContext.acquireTxCreationPermit();
348         actorContext.acquireTxCreationPermit();
349         actorContext.acquireTxCreationPermit();
350
351         watch.stop();
352
353         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
354     }
355
356     @Test
357     public void testClientDispatcherIsGlobalDispatcher(){
358         ActorContext actorContext =
359                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
360                         mock(Configuration.class), DatastoreContext.newBuilder().build());
361
362         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
363
364     }
365
366     @Test
367     public void testClientDispatcherIsNotGlobalDispatcher(){
368         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
369
370         ActorContext actorContext =
371                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
372                         mock(Configuration.class), DatastoreContext.newBuilder().build());
373
374         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
375
376         actorSystem.shutdown();
377
378     }
379
380     @Test
381     public void testSetDatastoreContext() {
382         new JavaTestKit(getSystem()) {{
383             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
384                             mock(Configuration.class), DatastoreContext.newBuilder().
385                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
386
387             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
388             assertEquals("getTransactionCommitOperationTimeout", 7,
389                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
390
391             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
392                     shardTransactionCommitTimeoutInSeconds(8).build();
393
394             actorContext.setDatastoreContext(newContext);
395
396             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
397
398             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
399
400             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
401             assertEquals("getTransactionCommitOperationTimeout", 8,
402                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
403         }};
404     }
405
406     @Test
407     public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
408
409             TestActorRef<MessageCollectorActor> shardManager =
410                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
411
412             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
413                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
414
415             final String expPrimaryPath = "akka://test-system/find-primary-shard";
416             ActorContext actorContext =
417                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
418                             mock(Configuration.class), dataStoreContext) {
419                         @Override
420                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
421                             return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
422                         }
423                     };
424
425
426             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
427             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
428
429             assertNotNull(actual);
430             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
431             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
432                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
433
434             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
435
436             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
437
438             assertEquals(cachedInfo, actual);
439
440             // Wait for 200 Milliseconds. The cached entry should have been removed.
441
442             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
443
444             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
445
446             assertNull(cached);
447
448     }
449
450     @Test
451     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
452
453             TestActorRef<MessageCollectorActor> shardManager =
454                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
455
456             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
457                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
458
459             ActorContext actorContext =
460                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
461                             mock(Configuration.class), dataStoreContext) {
462                         @Override
463                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
464                             return Futures.successful((Object) new PrimaryNotFoundException("not found"));
465                         }
466                     };
467
468
469             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
470
471             try {
472                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
473                 fail("Expected PrimaryNotFoundException");
474             } catch(PrimaryNotFoundException e){
475
476             }
477
478             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
479
480             assertNull(cached);
481     }
482
483     @Test
484     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
485
486             TestActorRef<MessageCollectorActor> shardManager =
487                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
488
489             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
490                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
491
492             ActorContext actorContext =
493                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
494                             mock(Configuration.class), dataStoreContext) {
495                         @Override
496                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
497                             return Futures.successful((Object) new NotInitializedException("not iniislized"));
498                         }
499                     };
500
501
502             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
503
504             try {
505                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
506                 fail("Expected NotInitializedException");
507             } catch(NotInitializedException e){
508
509             }
510
511             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
512
513             assertNull(cached);
514     }
515
516     @Test
517     public void testBroadcast() {
518         new JavaTestKit(getSystem()) {{
519             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
520             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
521
522             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
523             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
524             shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
525             shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
526             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
527
528             Configuration mockConfig = mock(Configuration.class);
529             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
530                     when(mockConfig).getAllShardNames();
531
532             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
533                     mock(ClusterWrapper.class), mockConfig,
534                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
535
536             actorContext.broadcast(new TestMessage());
537
538             expectFirstMatching(shardActorRef1, TestMessage.class);
539             expectFirstMatching(shardActorRef2, TestMessage.class);
540         }};
541     }
542
543     private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
544         int count = 5000 / 50;
545         for(int i = 0; i < count; i++) {
546             try {
547                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
548                 if(message != null) {
549                     return message;
550                 }
551             } catch (Exception e) {}
552
553             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
554         }
555
556         Assert.fail("Did not receive message of type " + clazz);
557         return null;
558     }
559 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.