Add unit test for FrontedMetadata memory leaks
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtilsTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Address;
25 import akka.actor.Props;
26 import akka.actor.UntypedAbstractActor;
27 import akka.dispatch.Futures;
28 import akka.japi.Creator;
29 import akka.testkit.TestActorRef;
30 import akka.testkit.javadsl.TestKit;
31 import akka.util.Timeout;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.typesafe.config.ConfigFactory;
35 import java.time.Duration;
36 import java.util.Arrays;
37 import java.util.Map;
38 import java.util.Optional;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.Assert;
41 import org.junit.Test;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
47 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
48 import org.opendaylight.controller.cluster.datastore.config.Configuration;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
57 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
58 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
59 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
60 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
61 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.Await;
66 import scala.concurrent.Future;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class ActorUtilsTest extends AbstractActorTest {
70
71     static final Logger LOG = LoggerFactory.getLogger(ActorUtilsTest.class);
72
73     private static class TestMessage {
74     }
75
76     private static final class MockShardManager extends UntypedAbstractActor {
77
78         private final boolean found;
79         private final ActorRef actorRef;
80         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
81
82         private MockShardManager(final boolean found, final ActorRef actorRef) {
83
84             this.found = found;
85             this.actorRef = actorRef;
86         }
87
88         @Override public void onReceive(final Object message) {
89             if (message instanceof FindPrimary) {
90                 FindPrimary fp = (FindPrimary)message;
91                 Object resp = findPrimaryResponses.get(fp.getShardName());
92                 if (resp == null) {
93                     LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
94                 } else {
95                     getSender().tell(resp, getSelf());
96                 }
97
98                 return;
99             }
100
101             if (found) {
102                 getSender().tell(new LocalShardFound(actorRef), getSelf());
103             } else {
104                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
105             }
106         }
107
108         void addFindPrimaryResp(final String shardName, final Object resp) {
109             findPrimaryResponses.put(shardName, resp);
110         }
111
112         private static Props props(final boolean found, final ActorRef actorRef) {
113             return Props.create(new MockShardManagerCreator(found, actorRef));
114         }
115
116         private static Props props() {
117             return Props.create(new MockShardManagerCreator());
118         }
119
120         @SuppressWarnings("serial")
121         private static class MockShardManagerCreator implements Creator<MockShardManager> {
122             final boolean found;
123             final ActorRef actorRef;
124
125             MockShardManagerCreator() {
126                 this.found = false;
127                 this.actorRef = null;
128             }
129
130             MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
131                 this.found = found;
132                 this.actorRef = actorRef;
133             }
134
135             @Override
136             public MockShardManager create() {
137                 return new MockShardManager(found, actorRef);
138             }
139         }
140     }
141
142     @Test
143     public void testFindLocalShardWithShardFound() {
144         final TestKit testKit = new TestKit(getSystem());
145         testKit.within(Duration.ofSeconds(1), () -> {
146             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
147
148             ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
149
150             ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
151                 mock(ClusterWrapper.class), mock(Configuration.class));
152
153             Optional<ActorRef> out = actorUtils.findLocalShard("default");
154
155             assertEquals(shardActorRef, out.get());
156
157             testKit.expectNoMessage();
158             return null;
159         });
160     }
161
162     @Test
163     public void testFindLocalShardWithShardNotFound() {
164         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
165
166         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
167             mock(Configuration.class));
168
169         Optional<ActorRef> out = actorUtils.findLocalShard("default");
170         assertFalse(out.isPresent());
171     }
172
173     @Test
174     public void testExecuteRemoteOperation() {
175         ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
176
177         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
178
179         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
180             mock(ClusterWrapper.class), mock(Configuration.class));
181
182         ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
183
184         Object out = actorUtils.executeOperation(actor, "hello");
185
186         assertEquals("hello", out);
187     }
188
189     @Test
190     public void testExecuteRemoteOperationAsync() throws Exception {
191         ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
192
193         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
194
195         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
196             mock(ClusterWrapper.class), mock(Configuration.class));
197
198         ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
199
200         Future<Object> future = actorUtils.executeOperationAsync(actor, "hello");
201
202         Object result = Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
203         assertEquals("Result", "hello", result);
204     }
205
206     @Test
207     public void testIsPathLocal() {
208         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
209         ActorUtils actorUtils = null;
210
211         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
212         assertFalse(actorUtils.isPathLocal(null));
213         assertFalse(actorUtils.isPathLocal(""));
214
215         clusterWrapper.setSelfAddress(null);
216         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
217         assertFalse(actorUtils.isPathLocal(""));
218
219         // even if the path is in local format, match the primary path (first 3 elements) and return true
220         clusterWrapper.setSelfAddress(new Address("akka", "test"));
221         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
222         assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
223
224         clusterWrapper.setSelfAddress(new Address("akka", "test"));
225         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
226         assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
227
228         clusterWrapper.setSelfAddress(new Address("akka", "test"));
229         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
230         assertTrue(actorUtils.isPathLocal("akka://test/user/token2/token3/$a"));
231
232         // self address of remote format,but Tx path local format.
233         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
234         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
235         assertTrue(actorUtils.isPathLocal("akka://system/user/shardmanager/shard/transaction"));
236
237         // self address of local format,but Tx path remote format.
238         clusterWrapper.setSelfAddress(new Address("akka", "system"));
239         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
240         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
241
242         //local path but not same
243         clusterWrapper.setSelfAddress(new Address("akka", "test"));
244         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
245         assertTrue(actorUtils.isPathLocal("akka://test1/user/$a"));
246
247         //ip and port same
248         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
249         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
250         assertTrue(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/"));
251
252         // forward-slash missing in address
253         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
254         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
255         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550"));
256
257         //ips differ
258         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
259         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
260         assertFalse(actorUtils.isPathLocal("akka://system@127.1.0.1:2550/"));
261
262         //ports differ
263         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
264         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
265         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2551/"));
266     }
267
268     @Test
269     public void testClientDispatcherIsGlobalDispatcher() {
270         ActorUtils actorUtils = new ActorUtils(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
271                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
272
273         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
274     }
275
276     @Test
277     public void testClientDispatcherIsNotGlobalDispatcher() {
278         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
279                 ConfigFactory.load("application-with-custom-dispatchers.conf"));
280
281         ActorUtils actorUtils = new ActorUtils(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
282                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
283
284         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
285
286         actorSystem.terminate();
287     }
288
289     @Test
290     public void testSetDatastoreContext() {
291         final TestKit testKit = new TestKit(getSystem());
292         ActorUtils actorUtils = new ActorUtils(getSystem(), testKit.getRef(),
293             mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
294             .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
295             new PrimaryShardInfoFutureCache());
296
297         assertEquals("getOperationDuration", 5, actorUtils.getOperationDuration().toSeconds());
298         assertEquals("getTransactionCommitOperationTimeout", 7,
299             actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
300
301         DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
302                 .shardTransactionCommitTimeoutInSeconds(8).build();
303
304         DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
305         Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
306
307         actorUtils.setDatastoreContext(mockContextFactory);
308
309         testKit.expectMsgClass(Duration.ofSeconds(5), DatastoreContextFactory.class);
310
311         Assert.assertSame("getDatastoreContext", newContext, actorUtils.getDatastoreContext());
312
313         assertEquals("getOperationDuration", 6, actorUtils.getOperationDuration().toSeconds());
314         assertEquals("getTransactionCommitOperationTimeout", 8,
315             actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
316     }
317
318     @Test
319     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
320
321         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
322
323         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
324                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
325                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
326
327         final String expPrimaryPath = "akka://test-system/find-primary-shard";
328         final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
329         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
330                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
331             @Override
332             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
333                 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
334             }
335         };
336
337         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
338         PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
339
340         assertNotNull(actual);
341         assertFalse("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
342         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
343                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
344         assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
345
346         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
347
348         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
349
350         assertEquals(cachedInfo, actual);
351
352         actorUtils.getPrimaryShardInfoCache().remove("foobar");
353
354         cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
355
356         assertNull(cached);
357     }
358
359     @Test
360     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
361
362         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
363
364         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
365                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
366                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
367
368         final DataTree mockDataTree = Mockito.mock(DataTree.class);
369         final String expPrimaryPath = "akka://test-system/find-primary-shard";
370         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
371                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
372             @Override
373             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
374                 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
375             }
376         };
377
378         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
379         PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
380
381         assertNotNull(actual);
382         assertTrue("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
383         assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
384         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
385                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
386         assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
387
388         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
389
390         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
391
392         assertEquals(cachedInfo, actual);
393
394         actorUtils.getPrimaryShardInfoCache().remove("foobar");
395
396         cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
397
398         assertNull(cached);
399     }
400
401     @Test
402     public void testFindPrimaryShardAsyncPrimaryNotFound() {
403         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
404     }
405
406     @Test
407     public void testFindPrimaryShardAsyncActorNotInitialized() {
408         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
409     }
410
411     @SuppressWarnings("checkstyle:IllegalCatch")
412     private static void testFindPrimaryExceptions(final Object expectedException) {
413         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
414
415         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
416                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
417                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
418
419         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
420                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
421             @Override
422             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
423                 return Futures.successful(expectedException);
424             }
425         };
426
427         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
428
429         try {
430             Await.result(foobar, FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
431             fail("Expected" + expectedException.getClass().toString());
432         } catch (Exception e) {
433             if (!expectedException.getClass().isInstance(e)) {
434                 fail("Expected Exception of type " + expectedException.getClass().toString());
435             }
436         }
437
438         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
439
440         assertNull(cached);
441     }
442
443     @Test
444     public void testBroadcast() {
445         ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
446         ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
447
448         TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
449             MockShardManager.props());
450         MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
451         shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
452             shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
453         shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
454             shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
455         shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
456
457         Configuration mockConfig = mock(Configuration.class);
458         doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
459         .getAllShardNames();
460
461         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
462             mock(ClusterWrapper.class), mockConfig,
463             DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
464             new PrimaryShardInfoFutureCache());
465
466         actorUtils.broadcast(v -> new TestMessage(), TestMessage.class);
467
468         MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
469         MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
470     }
471 }