Merge "Add LeaderStateChanged notification"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.mock;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.UntypedActor;
17 import akka.dispatch.Futures;
18 import akka.japi.Creator;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.concurrent.TimeUnit;
26 import org.apache.commons.lang.time.StopWatch;
27 import org.junit.Assert;
28 import org.junit.Test;
29 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.Configuration;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
35 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
36 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
37 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
38 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
39 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
40 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
41 import scala.concurrent.Await;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.Duration;
44 import scala.concurrent.duration.FiniteDuration;
45
46 public class ActorContextTest extends AbstractActorTest{
47
48     private static class MockShardManager extends UntypedActor {
49
50         private final boolean found;
51         private final ActorRef actorRef;
52
53         private MockShardManager(boolean found, ActorRef actorRef){
54
55             this.found = found;
56             this.actorRef = actorRef;
57         }
58
59         @Override public void onReceive(Object message) throws Exception {
60             if(found){
61                 getSender().tell(new LocalShardFound(actorRef), getSelf());
62             } else {
63                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
64             }
65         }
66
67         private static Props props(final boolean found, final ActorRef actorRef){
68             return Props.create(new MockShardManagerCreator(found, actorRef) );
69         }
70
71         @SuppressWarnings("serial")
72         private static class MockShardManagerCreator implements Creator<MockShardManager> {
73             final boolean found;
74             final ActorRef actorRef;
75
76             MockShardManagerCreator(boolean found, ActorRef actorRef) {
77                 this.found = found;
78                 this.actorRef = actorRef;
79             }
80
81             @Override
82             public MockShardManager create() throws Exception {
83                 return new MockShardManager(found, actorRef);
84             }
85         }
86     }
87
88     @Test
89     public void testFindLocalShardWithShardFound(){
90         new JavaTestKit(getSystem()) {{
91
92             new Within(duration("1 seconds")) {
93                 @Override
94                 protected void run() {
95
96                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
97
98                     ActorRef shardManagerActorRef = getSystem()
99                         .actorOf(MockShardManager.props(true, shardActorRef));
100
101                     ActorContext actorContext =
102                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
103                             mock(Configuration.class));
104
105                     Optional<ActorRef> out = actorContext.findLocalShard("default");
106
107                     assertEquals(shardActorRef, out.get());
108
109
110                     expectNoMsg();
111                 }
112             };
113         }};
114
115     }
116
117     @Test
118     public void testFindLocalShardWithShardNotFound(){
119         new JavaTestKit(getSystem()) {{
120             ActorRef shardManagerActorRef = getSystem()
121                     .actorOf(MockShardManager.props(false, null));
122
123             ActorContext actorContext =
124                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
125                             mock(Configuration.class));
126
127             Optional<ActorRef> out = actorContext.findLocalShard("default");
128             assertTrue(!out.isPresent());
129         }};
130
131     }
132
133     @Test
134     public void testExecuteRemoteOperation() {
135         new JavaTestKit(getSystem()) {{
136             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
137
138             ActorRef shardManagerActorRef = getSystem()
139                     .actorOf(MockShardManager.props(true, shardActorRef));
140
141             ActorContext actorContext =
142                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
143                             mock(Configuration.class));
144
145             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
146
147             Object out = actorContext.executeOperation(actor, "hello");
148
149             assertEquals("hello", out);
150         }};
151     }
152
153     @Test
154     public void testExecuteRemoteOperationAsync() {
155         new JavaTestKit(getSystem()) {{
156             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
157
158             ActorRef shardManagerActorRef = getSystem()
159                     .actorOf(MockShardManager.props(true, shardActorRef));
160
161             ActorContext actorContext =
162                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
163                             mock(Configuration.class));
164
165             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
166
167             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
168
169             try {
170                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
171                 assertEquals("Result", "hello", result);
172             } catch(Exception e) {
173                 throw new AssertionError(e);
174             }
175         }};
176     }
177
178     @Test
179     public void testIsPathLocal() {
180         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
181         ActorContext actorContext = null;
182
183         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
184         assertEquals(false, actorContext.isPathLocal(null));
185         assertEquals(false, actorContext.isPathLocal(""));
186
187         clusterWrapper.setSelfAddress(null);
188         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
189         assertEquals(false, actorContext.isPathLocal(""));
190
191         // even if the path is in local format, match the primary path (first 3 elements) and return true
192         clusterWrapper.setSelfAddress(new Address("akka", "test"));
193         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
194         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
195
196         clusterWrapper.setSelfAddress(new Address("akka", "test"));
197         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
198         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
199
200         clusterWrapper.setSelfAddress(new Address("akka", "test"));
201         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
202         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
203
204         // self address of remote format,but Tx path local format.
205         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
206         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
207         assertEquals(true, actorContext.isPathLocal(
208             "akka://system/user/shardmanager/shard/transaction"));
209
210         // self address of local format,but Tx path remote format.
211         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
212         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
213         assertEquals(false, actorContext.isPathLocal(
214             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
215
216         //local path but not same
217         clusterWrapper.setSelfAddress(new Address("akka", "test"));
218         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
219         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
220
221         //ip and port same
222         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
223         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
224         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
225
226         // forward-slash missing in address
227         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
228         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
229         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
230
231         //ips differ
232         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
233         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
234         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
235
236         //ports differ
237         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
238         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
240     }
241
242     @Test
243     public void testResolvePathForRemoteActor() {
244         ActorContext actorContext =
245                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
246                         ClusterWrapper.class),
247                         mock(Configuration.class));
248
249         String actual = actorContext.resolvePath(
250                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
251                 "akka://system/user/shardmanager/shard/transaction");
252
253         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
254
255         assertEquals(expected, actual);
256     }
257
258     @Test
259     public void testResolvePathForLocalActor() {
260         ActorContext actorContext =
261                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
262                         mock(Configuration.class));
263
264         String actual = actorContext.resolvePath(
265                 "akka://system/user/shardmanager/shard",
266                 "akka://system/user/shardmanager/shard/transaction");
267
268         String expected = "akka://system/user/shardmanager/shard/transaction";
269
270         assertEquals(expected, actual);
271     }
272
273     @Test
274     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
275         ActorContext actorContext =
276                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
277                         mock(Configuration.class));
278
279         String actual = actorContext.resolvePath(
280                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
281                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
282
283         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
284
285         assertEquals(expected, actual);
286     }
287
288     @Test
289     public void testRateLimiting(){
290         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
291
292         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
293         doReturn("config").when(mockDataStoreContext).getDataStoreType();
294         doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
295
296         ActorContext actorContext =
297                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
298                         mock(Configuration.class), mockDataStoreContext);
299
300         // Check that the initial value is being picked up from DataStoreContext
301         assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
302
303         actorContext.setTxCreationLimit(1.0);
304
305         assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
306
307
308         StopWatch watch = new StopWatch();
309
310         watch.start();
311
312         actorContext.acquireTxCreationPermit();
313         actorContext.acquireTxCreationPermit();
314         actorContext.acquireTxCreationPermit();
315
316         watch.stop();
317
318         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
319     }
320
321     @Test
322     public void testClientDispatcherIsGlobalDispatcher(){
323
324         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
325
326         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
327         doReturn("config").when(mockDataStoreContext).getDataStoreType();
328         doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
329
330         ActorContext actorContext =
331                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
332                         mock(Configuration.class), mockDataStoreContext);
333
334         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
335
336     }
337
338     @Test
339     public void testClientDispatcherIsNotGlobalDispatcher(){
340
341         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
342
343         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
344         doReturn("config").when(mockDataStoreContext).getDataStoreType();
345         doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
346
347         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
348
349         ActorContext actorContext =
350                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
351                         mock(Configuration.class), mockDataStoreContext);
352
353         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
354
355         actorSystem.shutdown();
356
357     }
358
359     @Test
360     public void testSetDatastoreContext() {
361         new JavaTestKit(getSystem()) {{
362             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
363                             mock(Configuration.class), DatastoreContext.newBuilder().
364                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
365
366             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
367             assertEquals("getTransactionCommitOperationTimeout", 7,
368                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
369
370             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
371                     shardTransactionCommitTimeoutInSeconds(8).build();
372
373             actorContext.setDatastoreContext(newContext);
374
375             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
376
377             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
378
379             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
380             assertEquals("getTransactionCommitOperationTimeout", 8,
381                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
382         }};
383     }
384
385     @Test
386     public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
387
388             TestActorRef<MessageCollectorActor> shardManager =
389                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
390
391             DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
392
393             doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
394             doReturn("config").when(mockDataStoreContext).getDataStoreType();
395             doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
396
397             ActorContext actorContext =
398                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
399                             mock(Configuration.class), mockDataStoreContext) {
400                         @Override
401                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
402                             return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
403                         }
404                     };
405
406
407             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
408             ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
409
410             assertNotNull(actual);
411
412             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
413
414             ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
415
416             assertEquals(cachedSelection, actual);
417
418             // Wait for 200 Milliseconds. The cached entry should have been removed.
419
420             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
421
422             cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
423
424             assertNull(cached);
425
426     }
427
428     @Test
429     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
430
431             TestActorRef<MessageCollectorActor> shardManager =
432                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
433
434             DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
435
436             doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
437             doReturn("config").when(mockDataStoreContext).getDataStoreType();
438             doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
439
440             ActorContext actorContext =
441                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
442                             mock(Configuration.class), mockDataStoreContext) {
443                         @Override
444                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
445                             return Futures.successful((Object) new PrimaryNotFound("foobar"));
446                         }
447                     };
448
449
450             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
451
452             try {
453                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
454                 fail("Expected PrimaryNotFoundException");
455             } catch(PrimaryNotFoundException e){
456
457             }
458
459             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
460
461             assertNull(cached);
462
463     }
464
465     @Test
466     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
467
468             TestActorRef<MessageCollectorActor> shardManager =
469                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
470
471             DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
472
473             doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
474             doReturn("config").when(mockDataStoreContext).getDataStoreType();
475             doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
476
477             ActorContext actorContext =
478                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
479                             mock(Configuration.class), mockDataStoreContext) {
480                         @Override
481                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
482                             return Futures.successful((Object) new ActorNotInitialized());
483                         }
484                     };
485
486
487             Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
488
489             try {
490                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
491                 fail("Expected NotInitializedException");
492             } catch(NotInitializedException e){
493
494             }
495
496             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
497
498             assertNull(cached);
499
500     }
501
502 }