BUG 3045 : Use non-strict parsing in hello message.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.mock;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.UntypedActor;
17 import akka.dispatch.Futures;
18 import akka.japi.Creator;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Arrays;
28 import java.util.Map;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang.time.StopWatch;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
41 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import scala.concurrent.Await;
48 import scala.concurrent.Future;
49 import scala.concurrent.duration.Duration;
50 import scala.concurrent.duration.FiniteDuration;
51
52 public class ActorContextTest extends AbstractActorTest{
53
54     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
55
56     private static class TestMessage {
57     }
58
59     private static class MockShardManager extends UntypedActor {
60
61         private final boolean found;
62         private final ActorRef actorRef;
63         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
64
65         private MockShardManager(boolean found, ActorRef actorRef){
66
67             this.found = found;
68             this.actorRef = actorRef;
69         }
70
71         @Override public void onReceive(Object message) throws Exception {
72             if(message instanceof FindPrimary) {
73                 FindPrimary fp = (FindPrimary)message;
74                 Object resp = findPrimaryResponses.get(fp.getShardName());
75                 if(resp == null) {
76                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
77                 } else {
78                     getSender().tell(resp, getSelf());
79                 }
80
81                 return;
82             }
83
84             if(found){
85                 getSender().tell(new LocalShardFound(actorRef), getSelf());
86             } else {
87                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
88             }
89         }
90
91         void addFindPrimaryResp(String shardName, Object resp) {
92             findPrimaryResponses.put(shardName, resp);
93         }
94
95         private static Props props(final boolean found, final ActorRef actorRef){
96             return Props.create(new MockShardManagerCreator(found, actorRef) );
97         }
98
99         private static Props props(){
100             return Props.create(new MockShardManagerCreator() );
101         }
102
103         @SuppressWarnings("serial")
104         private static class MockShardManagerCreator implements Creator<MockShardManager> {
105             final boolean found;
106             final ActorRef actorRef;
107
108             MockShardManagerCreator() {
109                 this.found = false;
110                 this.actorRef = null;
111             }
112
113             MockShardManagerCreator(boolean found, ActorRef actorRef) {
114                 this.found = found;
115                 this.actorRef = actorRef;
116             }
117
118             @Override
119             public MockShardManager create() throws Exception {
120                 return new MockShardManager(found, actorRef);
121             }
122         }
123     }
124
125     @Test
126     public void testFindLocalShardWithShardFound(){
127         new JavaTestKit(getSystem()) {{
128
129             new Within(duration("1 seconds")) {
130                 @Override
131                 protected void run() {
132
133                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
134
135                     ActorRef shardManagerActorRef = getSystem()
136                         .actorOf(MockShardManager.props(true, shardActorRef));
137
138                     ActorContext actorContext =
139                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
140                             mock(Configuration.class));
141
142                     Optional<ActorRef> out = actorContext.findLocalShard("default");
143
144                     assertEquals(shardActorRef, out.get());
145
146
147                     expectNoMsg();
148                 }
149             };
150         }};
151
152     }
153
154     @Test
155     public void testFindLocalShardWithShardNotFound(){
156         new JavaTestKit(getSystem()) {{
157             ActorRef shardManagerActorRef = getSystem()
158                     .actorOf(MockShardManager.props(false, null));
159
160             ActorContext actorContext =
161                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
162                             mock(Configuration.class));
163
164             Optional<ActorRef> out = actorContext.findLocalShard("default");
165             assertTrue(!out.isPresent());
166         }};
167
168     }
169
170     @Test
171     public void testExecuteRemoteOperation() {
172         new JavaTestKit(getSystem()) {{
173             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
174
175             ActorRef shardManagerActorRef = getSystem()
176                     .actorOf(MockShardManager.props(true, shardActorRef));
177
178             ActorContext actorContext =
179                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
180                             mock(Configuration.class));
181
182             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
183
184             Object out = actorContext.executeOperation(actor, "hello");
185
186             assertEquals("hello", out);
187         }};
188     }
189
190     @Test
191     public void testExecuteRemoteOperationAsync() {
192         new JavaTestKit(getSystem()) {{
193             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
194
195             ActorRef shardManagerActorRef = getSystem()
196                     .actorOf(MockShardManager.props(true, shardActorRef));
197
198             ActorContext actorContext =
199                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
200                             mock(Configuration.class));
201
202             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
203
204             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
205
206             try {
207                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
208                 assertEquals("Result", "hello", result);
209             } catch(Exception e) {
210                 throw new AssertionError(e);
211             }
212         }};
213     }
214
215     @Test
216     public void testIsPathLocal() {
217         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
218         ActorContext actorContext = null;
219
220         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
221         assertEquals(false, actorContext.isPathLocal(null));
222         assertEquals(false, actorContext.isPathLocal(""));
223
224         clusterWrapper.setSelfAddress(null);
225         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
226         assertEquals(false, actorContext.isPathLocal(""));
227
228         // even if the path is in local format, match the primary path (first 3 elements) and return true
229         clusterWrapper.setSelfAddress(new Address("akka", "test"));
230         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
231         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
232
233         clusterWrapper.setSelfAddress(new Address("akka", "test"));
234         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
236
237         clusterWrapper.setSelfAddress(new Address("akka", "test"));
238         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
240
241         // self address of remote format,but Tx path local format.
242         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
243         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
244         assertEquals(true, actorContext.isPathLocal(
245             "akka://system/user/shardmanager/shard/transaction"));
246
247         // self address of local format,but Tx path remote format.
248         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
249         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
250         assertEquals(false, actorContext.isPathLocal(
251             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
252
253         //local path but not same
254         clusterWrapper.setSelfAddress(new Address("akka", "test"));
255         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
256         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
257
258         //ip and port same
259         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
260         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
261         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
262
263         // forward-slash missing in address
264         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
265         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
266         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
267
268         //ips differ
269         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
270         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
271         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
272
273         //ports differ
274         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
275         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
276         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
277     }
278
279     @Test
280     public void testResolvePathForRemoteActor() {
281         ActorContext actorContext =
282                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
283                         ClusterWrapper.class),
284                         mock(Configuration.class));
285
286         String actual = actorContext.resolvePath(
287                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
288                 "akka://system/user/shardmanager/shard/transaction");
289
290         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
291
292         assertEquals(expected, actual);
293     }
294
295     @Test
296     public void testResolvePathForLocalActor() {
297         ActorContext actorContext =
298                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
299                         mock(Configuration.class));
300
301         String actual = actorContext.resolvePath(
302                 "akka://system/user/shardmanager/shard",
303                 "akka://system/user/shardmanager/shard/transaction");
304
305         String expected = "akka://system/user/shardmanager/shard/transaction";
306
307         assertEquals(expected, actual);
308     }
309
310     @Test
311     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
312         ActorContext actorContext =
313                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
314                         mock(Configuration.class));
315
316         String actual = actorContext.resolvePath(
317                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
318                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
319
320         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
321
322         assertEquals(expected, actual);
323     }
324
325     @Test
326     public void testRateLimiting(){
327         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
328                 transactionCreationInitialRateLimit(155L).build();
329
330         ActorContext actorContext =
331                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
332                         mock(Configuration.class), dataStoreContext);
333
334         // Check that the initial value is being picked up from DataStoreContext
335         assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
336
337         actorContext.setTxCreationLimit(1.0);
338
339         assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
340
341
342         StopWatch watch = new StopWatch();
343
344         watch.start();
345
346         actorContext.acquireTxCreationPermit();
347         actorContext.acquireTxCreationPermit();
348         actorContext.acquireTxCreationPermit();
349
350         watch.stop();
351
352         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
353     }
354
355     @Test
356     public void testClientDispatcherIsGlobalDispatcher(){
357         ActorContext actorContext =
358                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
359                         mock(Configuration.class), DatastoreContext.newBuilder().build());
360
361         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
362
363     }
364
365     @Test
366     public void testClientDispatcherIsNotGlobalDispatcher(){
367         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
368
369         ActorContext actorContext =
370                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
371                         mock(Configuration.class), DatastoreContext.newBuilder().build());
372
373         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
374
375         actorSystem.shutdown();
376
377     }
378
379     @Test
380     public void testSetDatastoreContext() {
381         new JavaTestKit(getSystem()) {{
382             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
383                             mock(Configuration.class), DatastoreContext.newBuilder().
384                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
385
386             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
387             assertEquals("getTransactionCommitOperationTimeout", 7,
388                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
389
390             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
391                     shardTransactionCommitTimeoutInSeconds(8).build();
392
393             actorContext.setDatastoreContext(newContext);
394
395             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
396
397             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
398
399             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
400             assertEquals("getTransactionCommitOperationTimeout", 8,
401                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
402         }};
403     }
404
405     @Test
406     public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
407
408             TestActorRef<MessageCollectorActor> shardManager =
409                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
410
411             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
412                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
413
414             ActorContext actorContext =
415                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
416                             mock(Configuration.class), dataStoreContext) {
417                         @Override
418                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
419                             return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
420                         }
421                     };
422
423
424             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
425             ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
426
427             assertNotNull(actual);
428
429             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
430
431             ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
432
433             assertEquals(cachedSelection, actual);
434
435             // Wait for 200 Milliseconds. The cached entry should have been removed.
436
437             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
438
439             cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
440
441             assertNull(cached);
442
443     }
444
445     @Test
446     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
447
448             TestActorRef<MessageCollectorActor> shardManager =
449                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
450
451             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
452                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
453
454             ActorContext actorContext =
455                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
456                             mock(Configuration.class), dataStoreContext) {
457                         @Override
458                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
459                             return Futures.successful((Object) new PrimaryNotFoundException("not found"));
460                         }
461                     };
462
463
464             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
465
466             try {
467                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
468                 fail("Expected PrimaryNotFoundException");
469             } catch(PrimaryNotFoundException e){
470
471             }
472
473             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
474
475             assertNull(cached);
476     }
477
478     @Test
479     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
480
481             TestActorRef<MessageCollectorActor> shardManager =
482                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
483
484             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
485                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
486
487             ActorContext actorContext =
488                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
489                             mock(Configuration.class), dataStoreContext) {
490                         @Override
491                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
492                             return Futures.successful((Object) new NotInitializedException("not iniislized"));
493                         }
494                     };
495
496
497             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
498
499             try {
500                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
501                 fail("Expected NotInitializedException");
502             } catch(NotInitializedException e){
503
504             }
505
506             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
507
508             assertNull(cached);
509     }
510
511     @Test
512     public void testBroadcast() {
513         new JavaTestKit(getSystem()) {{
514             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
515             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
516
517             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
518             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
519             shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
520             shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
521             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
522
523             Configuration mockConfig = mock(Configuration.class);
524             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
525                     when(mockConfig).getAllShardNames();
526
527             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
528                     mock(ClusterWrapper.class), mockConfig,
529                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
530
531             actorContext.broadcast(new TestMessage());
532
533             expectFirstMatching(shardActorRef1, TestMessage.class);
534             expectFirstMatching(shardActorRef2, TestMessage.class);
535         }};
536     }
537
538     private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
539         int count = 5000 / 50;
540         for(int i = 0; i < count; i++) {
541             try {
542                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
543                 if(message != null) {
544                     return message;
545                 }
546             } catch (Exception e) {}
547
548             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
549         }
550
551         Assert.fail("Did not receive message of type " + clazz);
552         return null;
553     }
554 }