Bump upstream SNAPSHOTS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtilsTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Address;
25 import akka.actor.Props;
26 import akka.actor.UntypedAbstractActor;
27 import akka.dispatch.Futures;
28 import akka.japi.Creator;
29 import akka.testkit.TestActorRef;
30 import akka.testkit.javadsl.TestKit;
31 import akka.util.Timeout;
32 import com.google.common.collect.Sets;
33 import com.typesafe.config.ConfigFactory;
34 import java.time.Duration;
35 import java.util.Arrays;
36 import java.util.HashMap;
37 import java.util.Map;
38 import java.util.Optional;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.Assert;
41 import org.junit.Test;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
47 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
48 import org.opendaylight.controller.cluster.datastore.config.Configuration;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
57 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
58 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
59 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
60 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
61 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
62 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.Await;
66 import scala.concurrent.Future;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class ActorUtilsTest extends AbstractActorTest {
70
71     static final Logger LOG = LoggerFactory.getLogger(ActorUtilsTest.class);
72
73     private static class TestMessage {
74     }
75
76     private static final class MockShardManager extends UntypedAbstractActor {
77         private final Map<String,Object> findPrimaryResponses = new HashMap<>();
78         private final boolean found;
79         private final ActorRef actorRef;
80
81         private MockShardManager(final boolean found, final ActorRef actorRef) {
82
83             this.found = found;
84             this.actorRef = actorRef;
85         }
86
87         @Override public void onReceive(final Object message) {
88             if (message instanceof FindPrimary) {
89                 FindPrimary fp = (FindPrimary)message;
90                 Object resp = findPrimaryResponses.get(fp.getShardName());
91                 if (resp == null) {
92                     LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
93                 } else {
94                     getSender().tell(resp, getSelf());
95                 }
96
97                 return;
98             }
99
100             if (found) {
101                 getSender().tell(new LocalShardFound(actorRef), getSelf());
102             } else {
103                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
104             }
105         }
106
107         void addFindPrimaryResp(final String shardName, final Object resp) {
108             findPrimaryResponses.put(shardName, resp);
109         }
110
111         private static Props props(final boolean found, final ActorRef actorRef) {
112             return Props.create(MockShardManager.class, new MockShardManagerCreator(found, actorRef));
113         }
114
115         private static Props props() {
116             return Props.create(MockShardManager.class, new MockShardManagerCreator());
117         }
118
119         @SuppressWarnings("serial")
120         private static class MockShardManagerCreator implements Creator<MockShardManager> {
121             final boolean found;
122             final ActorRef actorRef;
123
124             MockShardManagerCreator() {
125                 found = false;
126                 actorRef = null;
127             }
128
129             MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
130                 this.found = found;
131                 this.actorRef = actorRef;
132             }
133
134             @Override
135             public MockShardManager create() {
136                 return new MockShardManager(found, actorRef);
137             }
138         }
139     }
140
141     @Test
142     public void testFindLocalShardWithShardFound() {
143         final TestKit testKit = new TestKit(getSystem());
144         testKit.within(Duration.ofSeconds(1), () -> {
145             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
146
147             ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
148
149             ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
150                 mock(ClusterWrapper.class), mock(Configuration.class));
151
152             Optional<ActorRef> out = actorUtils.findLocalShard("default");
153
154             assertEquals(shardActorRef, out.get());
155
156             testKit.expectNoMessage();
157             return null;
158         });
159     }
160
161     @Test
162     public void testFindLocalShardWithShardNotFound() {
163         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
164
165         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
166             mock(Configuration.class));
167
168         Optional<ActorRef> out = actorUtils.findLocalShard("default");
169         assertFalse(out.isPresent());
170     }
171
172     @Test
173     public void testExecuteRemoteOperation() {
174         ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
175
176         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
177
178         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
179             mock(ClusterWrapper.class), mock(Configuration.class));
180
181         ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
182
183         Object out = actorUtils.executeOperation(actor, "hello");
184
185         assertEquals("hello", out);
186     }
187
188     @Test
189     public void testExecuteRemoteOperationAsync() throws Exception {
190         ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
191
192         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
193
194         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
195             mock(ClusterWrapper.class), mock(Configuration.class));
196
197         ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
198
199         Future<Object> future = actorUtils.executeOperationAsync(actor, "hello");
200
201         Object result = Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
202         assertEquals("Result", "hello", result);
203     }
204
205     @Test
206     public void testIsPathLocal() {
207         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
208         ActorUtils actorUtils = null;
209
210         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
211         assertFalse(actorUtils.isPathLocal(null));
212         assertFalse(actorUtils.isPathLocal(""));
213
214         clusterWrapper.setSelfAddress(null);
215         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
216         assertFalse(actorUtils.isPathLocal(""));
217
218         // even if the path is in local format, match the primary path (first 3 elements) and return true
219         clusterWrapper.setSelfAddress(new Address("akka", "test"));
220         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
221         assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
222
223         clusterWrapper.setSelfAddress(new Address("akka", "test"));
224         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
225         assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
226
227         clusterWrapper.setSelfAddress(new Address("akka", "test"));
228         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
229         assertTrue(actorUtils.isPathLocal("akka://test/user/token2/token3/$a"));
230
231         // self address of remote format,but Tx path local format.
232         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
233         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
234         assertTrue(actorUtils.isPathLocal("akka://system/user/shardmanager/shard/transaction"));
235
236         // self address of local format,but Tx path remote format.
237         clusterWrapper.setSelfAddress(new Address("akka", "system"));
238         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
239         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
240
241         //local path but not same
242         clusterWrapper.setSelfAddress(new Address("akka", "test"));
243         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
244         assertTrue(actorUtils.isPathLocal("akka://test1/user/$a"));
245
246         //ip and port same
247         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
248         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
249         assertTrue(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/"));
250
251         // forward-slash missing in address
252         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
253         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
254         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550"));
255
256         //ips differ
257         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
258         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
259         assertFalse(actorUtils.isPathLocal("akka://system@127.1.0.1:2550/"));
260
261         //ports differ
262         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
263         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
264         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2551/"));
265     }
266
267     @Test
268     public void testClientDispatcherIsGlobalDispatcher() {
269         ActorUtils actorUtils = new ActorUtils(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
270                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
271
272         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
273     }
274
275     @Test
276     public void testClientDispatcherIsNotGlobalDispatcher() {
277         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
278                 ConfigFactory.load("application-with-custom-dispatchers.conf"));
279
280         ActorUtils actorUtils = new ActorUtils(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
281                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
282
283         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
284
285         actorSystem.terminate();
286     }
287
288     @Test
289     public void testSetDatastoreContext() {
290         final TestKit testKit = new TestKit(getSystem());
291         ActorUtils actorUtils = new ActorUtils(getSystem(), testKit.getRef(),
292             mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
293             .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
294             new PrimaryShardInfoFutureCache());
295
296         assertEquals("getOperationDuration", 5, actorUtils.getOperationDuration().toSeconds());
297         assertEquals("getTransactionCommitOperationTimeout", 7,
298             actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
299
300         DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
301                 .shardTransactionCommitTimeoutInSeconds(8).build();
302
303         DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
304         Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
305
306         actorUtils.setDatastoreContext(mockContextFactory);
307
308         testKit.expectMsgClass(Duration.ofSeconds(5), DatastoreContextFactory.class);
309
310         Assert.assertSame("getDatastoreContext", newContext, actorUtils.getDatastoreContext());
311
312         assertEquals("getOperationDuration", 6, actorUtils.getOperationDuration().toSeconds());
313         assertEquals("getTransactionCommitOperationTimeout", 8,
314             actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
315     }
316
317     @Test
318     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
319
320         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
321
322         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
323                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
324                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
325
326         final String expPrimaryPath = "akka://test-system/find-primary-shard";
327         final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
328         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
329                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
330             @Override
331             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
332                 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
333             }
334         };
335
336         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
337         PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
338
339         assertNotNull(actual);
340         assertFalse("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
341         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
342                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
343         assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
344
345         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
346
347         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
348
349         assertEquals(cachedInfo, actual);
350
351         actorUtils.getPrimaryShardInfoCache().remove("foobar");
352
353         cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
354
355         assertNull(cached);
356     }
357
358     @Test
359     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
360
361         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
362
363         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
364                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
365                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
366
367         final DataTree mockDataTree = Mockito.mock(DataTree.class);
368         final String expPrimaryPath = "akka://test-system/find-primary-shard";
369         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
370                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
371             @Override
372             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
373                 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
374             }
375         };
376
377         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
378         PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
379
380         assertNotNull(actual);
381         assertTrue("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
382         assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
383         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
384                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
385         assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
386
387         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
388
389         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
390
391         assertEquals(cachedInfo, actual);
392
393         actorUtils.getPrimaryShardInfoCache().remove("foobar");
394
395         cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
396
397         assertNull(cached);
398     }
399
400     @Test
401     public void testFindPrimaryShardAsyncPrimaryNotFound() {
402         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
403     }
404
405     @Test
406     public void testFindPrimaryShardAsyncActorNotInitialized() {
407         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
408     }
409
410     @SuppressWarnings("checkstyle:IllegalCatch")
411     private static void testFindPrimaryExceptions(final Object expectedException) {
412         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
413
414         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
415                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
416                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
417
418         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
419                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
420             @Override
421             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
422                 return Futures.successful(expectedException);
423             }
424         };
425
426         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
427
428         try {
429             Await.result(foobar, FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
430             fail("Expected" + expectedException.getClass().toString());
431         } catch (Exception e) {
432             if (!expectedException.getClass().isInstance(e)) {
433                 fail("Expected Exception of type " + expectedException.getClass().toString());
434             }
435         }
436
437         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
438
439         assertNull(cached);
440     }
441
442     @Test
443     public void testBroadcast() {
444         ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
445         ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
446
447         TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
448             MockShardManager.props());
449         MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
450         shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
451             shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
452         shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
453             shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
454         shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
455
456         Configuration mockConfig = mock(Configuration.class);
457         doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
458         .getAllShardNames();
459
460         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
461             mock(ClusterWrapper.class), mockConfig,
462             DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
463             new PrimaryShardInfoFutureCache());
464
465         actorUtils.broadcast(v -> new TestMessage(), TestMessage.class);
466
467         MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
468         MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
469     }
470 }