Bug 2194: Modify FindPrimary to check for leader
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.mock;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.UntypedActor;
17 import akka.dispatch.Futures;
18 import akka.japi.Creator;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Arrays;
28 import java.util.Map;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang.time.StopWatch;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Await;
50 import scala.concurrent.Future;
51 import scala.concurrent.duration.Duration;
52 import scala.concurrent.duration.FiniteDuration;
53
54 public class ActorContextTest extends AbstractActorTest{
55
56     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
57
58     private static class TestMessage {
59     }
60
61     private static class MockShardManager extends UntypedActor {
62
63         private final boolean found;
64         private final ActorRef actorRef;
65         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
66
67         private MockShardManager(boolean found, ActorRef actorRef){
68
69             this.found = found;
70             this.actorRef = actorRef;
71         }
72
73         @Override public void onReceive(Object message) throws Exception {
74             if(message instanceof FindPrimary) {
75                 FindPrimary fp = (FindPrimary)message;
76                 Object resp = findPrimaryResponses.get(fp.getShardName());
77                 if(resp == null) {
78                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
79                 } else {
80                     getSender().tell(resp, getSelf());
81                 }
82
83                 return;
84             }
85
86             if(found){
87                 getSender().tell(new LocalShardFound(actorRef), getSelf());
88             } else {
89                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
90             }
91         }
92
93         void addFindPrimaryResp(String shardName, Object resp) {
94             findPrimaryResponses.put(shardName, resp);
95         }
96
97         private static Props props(final boolean found, final ActorRef actorRef){
98             return Props.create(new MockShardManagerCreator(found, actorRef) );
99         }
100
101         private static Props props(){
102             return Props.create(new MockShardManagerCreator() );
103         }
104
105         @SuppressWarnings("serial")
106         private static class MockShardManagerCreator implements Creator<MockShardManager> {
107             final boolean found;
108             final ActorRef actorRef;
109
110             MockShardManagerCreator() {
111                 this.found = false;
112                 this.actorRef = null;
113             }
114
115             MockShardManagerCreator(boolean found, ActorRef actorRef) {
116                 this.found = found;
117                 this.actorRef = actorRef;
118             }
119
120             @Override
121             public MockShardManager create() throws Exception {
122                 return new MockShardManager(found, actorRef);
123             }
124         }
125     }
126
127     @Test
128     public void testFindLocalShardWithShardFound(){
129         new JavaTestKit(getSystem()) {{
130
131             new Within(duration("1 seconds")) {
132                 @Override
133                 protected void run() {
134
135                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
136
137                     ActorRef shardManagerActorRef = getSystem()
138                         .actorOf(MockShardManager.props(true, shardActorRef));
139
140                     ActorContext actorContext =
141                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
142                             mock(Configuration.class));
143
144                     Optional<ActorRef> out = actorContext.findLocalShard("default");
145
146                     assertEquals(shardActorRef, out.get());
147
148
149                     expectNoMsg();
150                 }
151             };
152         }};
153
154     }
155
156     @Test
157     public void testFindLocalShardWithShardNotFound(){
158         new JavaTestKit(getSystem()) {{
159             ActorRef shardManagerActorRef = getSystem()
160                     .actorOf(MockShardManager.props(false, null));
161
162             ActorContext actorContext =
163                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
164                             mock(Configuration.class));
165
166             Optional<ActorRef> out = actorContext.findLocalShard("default");
167             assertTrue(!out.isPresent());
168         }};
169
170     }
171
172     @Test
173     public void testExecuteRemoteOperation() {
174         new JavaTestKit(getSystem()) {{
175             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
176
177             ActorRef shardManagerActorRef = getSystem()
178                     .actorOf(MockShardManager.props(true, shardActorRef));
179
180             ActorContext actorContext =
181                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
182                             mock(Configuration.class));
183
184             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
185
186             Object out = actorContext.executeOperation(actor, "hello");
187
188             assertEquals("hello", out);
189         }};
190     }
191
192     @Test
193     public void testExecuteRemoteOperationAsync() {
194         new JavaTestKit(getSystem()) {{
195             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
196
197             ActorRef shardManagerActorRef = getSystem()
198                     .actorOf(MockShardManager.props(true, shardActorRef));
199
200             ActorContext actorContext =
201                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
202                             mock(Configuration.class));
203
204             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
205
206             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
207
208             try {
209                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
210                 assertEquals("Result", "hello", result);
211             } catch(Exception e) {
212                 throw new AssertionError(e);
213             }
214         }};
215     }
216
217     @Test
218     public void testIsPathLocal() {
219         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
220         ActorContext actorContext = null;
221
222         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
223         assertEquals(false, actorContext.isPathLocal(null));
224         assertEquals(false, actorContext.isPathLocal(""));
225
226         clusterWrapper.setSelfAddress(null);
227         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
228         assertEquals(false, actorContext.isPathLocal(""));
229
230         // even if the path is in local format, match the primary path (first 3 elements) and return true
231         clusterWrapper.setSelfAddress(new Address("akka", "test"));
232         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
233         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
234
235         clusterWrapper.setSelfAddress(new Address("akka", "test"));
236         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
237         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
238
239         clusterWrapper.setSelfAddress(new Address("akka", "test"));
240         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
241         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
242
243         // self address of remote format,but Tx path local format.
244         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
245         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
246         assertEquals(true, actorContext.isPathLocal(
247             "akka://system/user/shardmanager/shard/transaction"));
248
249         // self address of local format,but Tx path remote format.
250         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
251         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
252         assertEquals(false, actorContext.isPathLocal(
253             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
254
255         //local path but not same
256         clusterWrapper.setSelfAddress(new Address("akka", "test"));
257         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
258         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
259
260         //ip and port same
261         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
262         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
263         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
264
265         // forward-slash missing in address
266         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
267         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
268         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
269
270         //ips differ
271         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
272         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
273         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
274
275         //ports differ
276         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
277         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
278         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
279     }
280
281     @Test
282     public void testResolvePathForRemoteActor() {
283         ActorContext actorContext =
284                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
285                         ClusterWrapper.class),
286                         mock(Configuration.class));
287
288         String actual = actorContext.resolvePath(
289                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
290                 "akka://system/user/shardmanager/shard/transaction");
291
292         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
293
294         assertEquals(expected, actual);
295     }
296
297     @Test
298     public void testResolvePathForLocalActor() {
299         ActorContext actorContext =
300                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
301                         mock(Configuration.class));
302
303         String actual = actorContext.resolvePath(
304                 "akka://system/user/shardmanager/shard",
305                 "akka://system/user/shardmanager/shard/transaction");
306
307         String expected = "akka://system/user/shardmanager/shard/transaction";
308
309         assertEquals(expected, actual);
310     }
311
312     @Test
313     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
314         ActorContext actorContext =
315                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
316                         mock(Configuration.class));
317
318         String actual = actorContext.resolvePath(
319                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
320                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
321
322         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
323
324         assertEquals(expected, actual);
325     }
326
327     @Test
328     public void testRateLimiting(){
329         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
330                 transactionCreationInitialRateLimit(155L).build();
331
332         ActorContext actorContext =
333                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
334                         mock(Configuration.class), dataStoreContext);
335
336         // Check that the initial value is being picked up from DataStoreContext
337         assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
338
339         actorContext.setTxCreationLimit(1.0);
340
341         assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
342
343
344         StopWatch watch = new StopWatch();
345
346         watch.start();
347
348         actorContext.acquireTxCreationPermit();
349         actorContext.acquireTxCreationPermit();
350         actorContext.acquireTxCreationPermit();
351
352         watch.stop();
353
354         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
355     }
356
357     @Test
358     public void testClientDispatcherIsGlobalDispatcher(){
359         ActorContext actorContext =
360                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
361                         mock(Configuration.class), DatastoreContext.newBuilder().build());
362
363         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
364
365     }
366
367     @Test
368     public void testClientDispatcherIsNotGlobalDispatcher(){
369         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
370
371         ActorContext actorContext =
372                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
373                         mock(Configuration.class), DatastoreContext.newBuilder().build());
374
375         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
376
377         actorSystem.shutdown();
378
379     }
380
381     @Test
382     public void testSetDatastoreContext() {
383         new JavaTestKit(getSystem()) {{
384             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
385                             mock(Configuration.class), DatastoreContext.newBuilder().
386                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
387
388             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
389             assertEquals("getTransactionCommitOperationTimeout", 7,
390                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
391
392             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
393                     shardTransactionCommitTimeoutInSeconds(8).build();
394
395             actorContext.setDatastoreContext(newContext);
396
397             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
398
399             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
400
401             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
402             assertEquals("getTransactionCommitOperationTimeout", 8,
403                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
404         }};
405     }
406
407     @Test
408     public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
409
410             TestActorRef<MessageCollectorActor> shardManager =
411                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
412
413             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
414                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
415
416             ActorContext actorContext =
417                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
418                             mock(Configuration.class), dataStoreContext) {
419                         @Override
420                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
421                             return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
422                         }
423                     };
424
425
426             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
427             ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
428
429             assertNotNull(actual);
430
431             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
432
433             ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
434
435             assertEquals(cachedSelection, actual);
436
437             // Wait for 200 Milliseconds. The cached entry should have been removed.
438
439             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
440
441             cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
442
443             assertNull(cached);
444
445     }
446
447     @Test
448     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
449
450             TestActorRef<MessageCollectorActor> shardManager =
451                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
452
453             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
454                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
455
456             ActorContext actorContext =
457                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
458                             mock(Configuration.class), dataStoreContext) {
459                         @Override
460                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
461                             return Futures.successful((Object) new PrimaryNotFound("foobar"));
462                         }
463                     };
464
465
466             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
467
468             try {
469                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
470                 fail("Expected PrimaryNotFoundException");
471             } catch(PrimaryNotFoundException e){
472
473             }
474
475             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
476
477             assertNull(cached);
478     }
479
480     @Test
481     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
482
483             TestActorRef<MessageCollectorActor> shardManager =
484                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
485
486             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
487                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
488
489             ActorContext actorContext =
490                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
491                             mock(Configuration.class), dataStoreContext) {
492                         @Override
493                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
494                             return Futures.successful((Object) new ActorNotInitialized());
495                         }
496                     };
497
498
499             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
500
501             try {
502                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
503                 fail("Expected NotInitializedException");
504             } catch(NotInitializedException e){
505
506             }
507
508             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
509
510             assertNull(cached);
511     }
512
513     @Test
514     public void testBroadcast() {
515         new JavaTestKit(getSystem()) {{
516             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
517             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
518
519             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
520             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
521             shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
522             shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
523             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
524
525             Configuration mockConfig = mock(Configuration.class);
526             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
527                     when(mockConfig).getAllShardNames();
528
529             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
530                     mock(ClusterWrapper.class), mockConfig,
531                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
532
533             actorContext.broadcast(new TestMessage());
534
535             expectFirstMatching(shardActorRef1, TestMessage.class);
536             expectFirstMatching(shardActorRef2, TestMessage.class);
537         }};
538     }
539
540     private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
541         int count = 5000 / 50;
542         for(int i = 0; i < count; i++) {
543             try {
544                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
545                 if(message != null) {
546                     return message;
547                 }
548             } catch (Exception e) {}
549
550             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
551         }
552
553         Assert.fail("Did not receive message of type " + clazz);
554         return null;
555     }
556 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.