Use ActorSystem.terminate()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.typesafe.config.ConfigFactory;
35 import java.util.Arrays;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Function;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.mockito.Mockito;
42 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
47 import org.opendaylight.controller.cluster.datastore.config.Configuration;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
56 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
57 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
58 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
59 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
60 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.Await;
65 import scala.concurrent.Future;
66 import scala.concurrent.duration.Duration;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class ActorContextTest extends AbstractActorTest{
70
71     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
72
73     private static class TestMessage {
74     }
75
76     private static class MockShardManager extends UntypedActor {
77
78         private final boolean found;
79         private final ActorRef actorRef;
80         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
81
82         private MockShardManager(final boolean found, final ActorRef actorRef){
83
84             this.found = found;
85             this.actorRef = actorRef;
86         }
87
88         @Override public void onReceive(final Object message) throws Exception {
89             if(message instanceof FindPrimary) {
90                 FindPrimary fp = (FindPrimary)message;
91                 Object resp = findPrimaryResponses.get(fp.getShardName());
92                 if(resp == null) {
93                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
94                 } else {
95                     getSender().tell(resp, getSelf());
96                 }
97
98                 return;
99             }
100
101             if(found){
102                 getSender().tell(new LocalShardFound(actorRef), getSelf());
103             } else {
104                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
105             }
106         }
107
108         void addFindPrimaryResp(final String shardName, final Object resp) {
109             findPrimaryResponses.put(shardName, resp);
110         }
111
112         private static Props props(final boolean found, final ActorRef actorRef){
113             return Props.create(new MockShardManagerCreator(found, actorRef) );
114         }
115
116         private static Props props(){
117             return Props.create(new MockShardManagerCreator() );
118         }
119
120         @SuppressWarnings("serial")
121         private static class MockShardManagerCreator implements Creator<MockShardManager> {
122             final boolean found;
123             final ActorRef actorRef;
124
125             MockShardManagerCreator() {
126                 this.found = false;
127                 this.actorRef = null;
128             }
129
130             MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
131                 this.found = found;
132                 this.actorRef = actorRef;
133             }
134
135             @Override
136             public MockShardManager create() throws Exception {
137                 return new MockShardManager(found, actorRef);
138             }
139         }
140     }
141
142     @Test
143     public void testFindLocalShardWithShardFound(){
144         new JavaTestKit(getSystem()) {{
145
146             new Within(duration("1 seconds")) {
147                 @Override
148                 protected void run() {
149
150                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
151
152                     ActorRef shardManagerActorRef = getSystem()
153                         .actorOf(MockShardManager.props(true, shardActorRef));
154
155                     ActorContext actorContext =
156                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
157                             mock(Configuration.class));
158
159                     Optional<ActorRef> out = actorContext.findLocalShard("default");
160
161                     assertEquals(shardActorRef, out.get());
162
163
164                     expectNoMsg();
165                 }
166             };
167         }};
168
169     }
170
171     @Test
172     public void testFindLocalShardWithShardNotFound(){
173         new JavaTestKit(getSystem()) {{
174             ActorRef shardManagerActorRef = getSystem()
175                     .actorOf(MockShardManager.props(false, null));
176
177             ActorContext actorContext =
178                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
179                             mock(Configuration.class));
180
181             Optional<ActorRef> out = actorContext.findLocalShard("default");
182             assertTrue(!out.isPresent());
183         }};
184
185     }
186
187     @Test
188     public void testExecuteRemoteOperation() {
189         new JavaTestKit(getSystem()) {{
190             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
191
192             ActorRef shardManagerActorRef = getSystem()
193                     .actorOf(MockShardManager.props(true, shardActorRef));
194
195             ActorContext actorContext =
196                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
197                             mock(Configuration.class));
198
199             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
200
201             Object out = actorContext.executeOperation(actor, "hello");
202
203             assertEquals("hello", out);
204         }};
205     }
206
207     @Test
208     public void testExecuteRemoteOperationAsync() {
209         new JavaTestKit(getSystem()) {{
210             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
211
212             ActorRef shardManagerActorRef = getSystem()
213                     .actorOf(MockShardManager.props(true, shardActorRef));
214
215             ActorContext actorContext =
216                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
217                             mock(Configuration.class));
218
219             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
220
221             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
222
223             try {
224                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
225                 assertEquals("Result", "hello", result);
226             } catch(Exception e) {
227                 throw new AssertionError(e);
228             }
229         }};
230     }
231
232     @Test
233     public void testIsPathLocal() {
234         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
235         ActorContext actorContext = null;
236
237         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
238         assertEquals(false, actorContext.isPathLocal(null));
239         assertEquals(false, actorContext.isPathLocal(""));
240
241         clusterWrapper.setSelfAddress(null);
242         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
243         assertEquals(false, actorContext.isPathLocal(""));
244
245         // even if the path is in local format, match the primary path (first 3 elements) and return true
246         clusterWrapper.setSelfAddress(new Address("akka", "test"));
247         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
249
250         clusterWrapper.setSelfAddress(new Address("akka", "test"));
251         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
252         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
253
254         clusterWrapper.setSelfAddress(new Address("akka", "test"));
255         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
256         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
257
258         // self address of remote format,but Tx path local format.
259         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
260         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
261         assertEquals(true, actorContext.isPathLocal(
262             "akka://system/user/shardmanager/shard/transaction"));
263
264         // self address of local format,but Tx path remote format.
265         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
266         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
267         assertEquals(false, actorContext.isPathLocal(
268             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
269
270         //local path but not same
271         clusterWrapper.setSelfAddress(new Address("akka", "test"));
272         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
273         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
274
275         //ip and port same
276         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
277         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
278         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
279
280         // forward-slash missing in address
281         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
282         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
283         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
284
285         //ips differ
286         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
287         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
288         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
289
290         //ports differ
291         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
292         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
293         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
294     }
295
296     @Test
297     public void testClientDispatcherIsGlobalDispatcher(){
298         ActorContext actorContext =
299                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
300                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
301
302         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
303     }
304
305     @Test
306     public void testClientDispatcherIsNotGlobalDispatcher(){
307         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
308
309         ActorContext actorContext =
310                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
311                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
312
313         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
314
315         actorSystem.terminate();
316     }
317
318     @Test
319     public void testSetDatastoreContext() {
320         new JavaTestKit(getSystem()) {{
321             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
322                             mock(Configuration.class), DatastoreContext.newBuilder().
323                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
324
325             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
326             assertEquals("getTransactionCommitOperationTimeout", 7,
327                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
328
329             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
330                     shardTransactionCommitTimeoutInSeconds(8).build();
331
332             DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
333             Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
334
335             actorContext.setDatastoreContext(mockContextFactory);
336
337             expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
338
339             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
340
341             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
342             assertEquals("getTransactionCommitOperationTimeout", 8,
343                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
344         }};
345     }
346
347     @Test
348     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
349
350             TestActorRef<MessageCollectorActor> shardManager =
351                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
352
353             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
354                     logicalStoreType(LogicalDatastoreType.CONFIGURATION).
355                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
356
357             final String expPrimaryPath = "akka://test-system/find-primary-shard";
358             final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
359             ActorContext actorContext =
360                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
361                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
362                         @Override
363                         protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
364                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
365                         }
366                     };
367
368             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
369             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
370
371             assertNotNull(actual);
372             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
373             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
374                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
375             assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
376
377             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
378
379             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
380
381             assertEquals(cachedInfo, actual);
382
383             actorContext.getPrimaryShardInfoCache().remove("foobar");
384
385             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
386
387             assertNull(cached);
388     }
389
390     @Test
391     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
392
393             TestActorRef<MessageCollectorActor> shardManager =
394                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
395
396             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
397                     logicalStoreType(LogicalDatastoreType.CONFIGURATION).
398                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
399
400             final DataTree mockDataTree = Mockito.mock(DataTree.class);
401             final String expPrimaryPath = "akka://test-system/find-primary-shard";
402             ActorContext actorContext =
403                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
404                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
405                         @Override
406                         protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
407                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
408                         }
409                     };
410
411             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
412             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
413
414             assertNotNull(actual);
415             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
416             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
417             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
418                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
419             assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
420
421             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
422
423             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
424
425             assertEquals(cachedInfo, actual);
426
427             actorContext.getPrimaryShardInfoCache().remove("foobar");
428
429             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
430
431             assertNull(cached);
432     }
433
434     @Test
435     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
436         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
437     }
438
439     @Test
440     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
441         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
442     }
443
444     private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
445         TestActorRef<MessageCollectorActor> shardManager =
446             TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
447
448         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
449             logicalStoreType(LogicalDatastoreType.CONFIGURATION).
450             shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
451
452         ActorContext actorContext =
453             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
454                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
455                 @Override
456                 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
457                     return Futures.successful(expectedException);
458                 }
459             };
460
461         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
462
463         try {
464             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
465             fail("Expected" + expectedException.getClass().toString());
466         } catch(Exception e){
467             if(!expectedException.getClass().isInstance(e)) {
468                 fail("Expected Exception of type " + expectedException.getClass().toString());
469             }
470         }
471
472         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
473
474         assertNull(cached);
475     }
476
477     @Test
478     public void testBroadcast() {
479         new JavaTestKit(getSystem()) {{
480             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
481             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
482
483             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
484             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
485             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
486                     DataStoreVersions.CURRENT_VERSION));
487             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
488                     DataStoreVersions.CURRENT_VERSION));
489             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
490
491             Configuration mockConfig = mock(Configuration.class);
492             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
493                     when(mockConfig).getAllShardNames();
494
495             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
496                     mock(ClusterWrapper.class), mockConfig,
497                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
498
499             actorContext.broadcast(new Function<Short, Object>() {
500                 @Override
501                 public Object apply(final Short v) {
502                     return new TestMessage();
503                 }
504             }, TestMessage.class);
505
506             MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
507             MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
508         }};
509     }
510
511 }