Add an actor for entity rpc execution.
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.time.Duration;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.function.Supplier;
37 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
38 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
39 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
47 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
48 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
49 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
50 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
51 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
52 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
53 import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
54 import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
55 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
56 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
57 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
58 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 public abstract class AbstractNativeEosTest {
63
64     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
65     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
66
67     protected static final String DEFAULT_DATACENTER = "dc-default";
68
69     protected static final List<String> TWO_NODE_SEED_NODES =
70             List.of("akka://ClusterSystem@127.0.0.1:2550",
71                     "akka://ClusterSystem@127.0.0.1:2551");
72
73     protected static final List<String> THREE_NODE_SEED_NODES =
74             List.of("akka://ClusterSystem@127.0.0.1:2550",
75                     "akka://ClusterSystem@127.0.0.1:2551",
76                     "akka://ClusterSystem@127.0.0.1:2552");
77
78     protected static final List<String> DATACENTER_SEED_NODES =
79             List.of("akka://ClusterSystem@127.0.0.1:2550",
80                     "akka://ClusterSystem@127.0.0.1:2551",
81                     "akka://ClusterSystem@127.0.0.1:2552",
82                     "akka://ClusterSystem@127.0.0.1:2553");
83
84     private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
85
86     protected static BindingCodecContext CODEC_CONTEXT
87             = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
88
89     private static final String REMOTE_PROTOCOL = "akka";
90     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
91     private static final String ROLE_PARAM = "akka.cluster.roles";
92     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
93     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
94
95     protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
96                                                                            final List<String> seedNodes)
97             throws ExecutionException, InterruptedException {
98         final Map<String, Object> overrides = new HashMap<>();
99         overrides.put(PORT_PARAM, port);
100         overrides.put(ROLE_PARAM, roles);
101         if (!seedNodes.isEmpty()) {
102             overrides.put(SEED_NODES_PARAM, seedNodes);
103         }
104
105         final Config config = ConfigFactory.parseMap(overrides)
106                 .withFallback(ConfigFactory.load());
107
108         // Create a classic Akka system since thats what we will have in osgi
109         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
110
111         return new MockNativeEntityOwnershipService(system);
112     }
113
114     protected static ClusterNode startupRemote(final int port, final List<String> roles)
115             throws ExecutionException, InterruptedException {
116         return startup(port, roles, THREE_NODE_SEED_NODES);
117     }
118
119     protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
120             throws ExecutionException, InterruptedException {
121         return startup(port, roles, seedNodes);
122     }
123
124     protected static ClusterNode startup(final int port, final List<String> roles)
125             throws ExecutionException, InterruptedException {
126         return startup(port, roles, List.of());
127     }
128
129     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
130             throws ExecutionException, InterruptedException {
131
132         return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
133     }
134
135     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
136                                          final Supplier<Behavior<BootstrapCommand>> bootstrap)
137             throws ExecutionException, InterruptedException {
138         // Override the configuration
139         final Map<String, Object> overrides = new HashMap<>(4);
140         overrides.put(PORT_PARAM, port);
141         overrides.put(ROLE_PARAM, roles);
142         if (!seedNodes.isEmpty()) {
143             overrides.put(SEED_NODES_PARAM, seedNodes);
144         }
145
146         final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
147
148         // Create a classic Akka system since thats what we will have in osgi
149         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
150         final ActorRef<BootstrapCommand> eosBootstrap =
151                 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
152
153         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
154                 GetRunningContext::new,
155                 Duration.ofSeconds(5),
156                 Adapter.toTyped(system.scheduler()));
157         final RunningContext runningContext = ask.toCompletableFuture().get();
158
159         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
160                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
161     }
162
163     protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
164                                                        final List<String> seedNodes, final String dataCenter)
165             throws ExecutionException, InterruptedException {
166         final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
167         final ActorRef<BootstrapCommand> eosBootstrap =
168                 Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
169
170         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
171                 GetRunningContext::new,
172                 Duration.ofSeconds(5),
173                 Adapter.toTyped(system.scheduler()));
174         final RunningContext runningContext = ask.toCompletableFuture().get();
175
176         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
177                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
178     }
179
180     protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
181                                                                final List<String> seedNodes) {
182         final Map<String, Object> overrides = new HashMap<>();
183         overrides.put(PORT_PARAM, port);
184         overrides.put(ROLE_PARAM, roles);
185         if (!seedNodes.isEmpty()) {
186             overrides.put(SEED_NODES_PARAM, seedNodes);
187         }
188
189         final Config config = ConfigFactory.parseMap(overrides)
190                 .withFallback(ConfigFactory.load());
191
192         // Create a classic Akka system since thats what we will have in osgi
193         return akka.actor.ActorSystem.create("ClusterSystem", config);
194     }
195
196     protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
197                                                                final List<String> seedNodes, final String dataCenter) {
198         final Map<String, Object> overrides = new HashMap<>();
199         overrides.put(PORT_PARAM, port);
200         overrides.put(ROLE_PARAM, roles);
201         if (!seedNodes.isEmpty()) {
202             overrides.put(SEED_NODES_PARAM, seedNodes);
203         }
204         overrides.put(DATA_CENTER_PARAM, dataCenter);
205
206         final Config config = ConfigFactory.parseMap(overrides)
207                 .withFallback(ConfigFactory.load());
208
209         // Create a classic Akka system since thats what we will have in osgi
210         return akka.actor.ActorSystem.create("ClusterSystem", config);
211     }
212
213     private static Behavior<BootstrapCommand> rootBehavior() {
214         return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
215     }
216
217     protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
218         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
219         registerCandidates(candidateRegistry, entity, members);
220     }
221
222     protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
223                                              final DOMEntity entity, final String... members) {
224         for (final String member : members) {
225             candidateRegistry.tell(new RegisterCandidate(entity, member));
226         }
227     }
228
229     protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
230                                                final String... members) {
231         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
232         for (final String member : members) {
233             candidateRegistry.tell(new UnregisterCandidate(entity, member));
234         }
235     }
236
237     protected static  MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
238         final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
239         final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
240         listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
241
242         return listener;
243     }
244
245     protected static void reachableMember(final ClusterNode node, final String... role) {
246         reachableMember(node.getOwnerSupervisor(), role);
247     }
248
249     protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
250                                           final String... role) {
251         ownerSupervisor.tell(new MemberReachableEvent(
252                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
253     }
254
255     protected static void unreachableMember(final ClusterNode node, final String... role) {
256         unreachableMember(node.getOwnerSupervisor(), role);
257     }
258
259     protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
260                                             final String... role) {
261         ownerSupervisor.tell(new MemberUnreachableEvent(
262                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
263     }
264
265     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
266         await().until(() -> {
267             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
268             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
269                     AskPattern.ask(distributedData.replicator(),
270                             replyTo -> new Replicator.Get<>(
271                                     new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
272                             Duration.ofSeconds(5),
273                             clusterNode.getActorSystem().scheduler());
274
275             final Replicator.GetResponse<LWWRegister<String>> response =
276                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
277
278             if (response instanceof Replicator.GetSuccess) {
279                 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
280                 return !owner.isEmpty();
281             }
282
283             return false;
284         });
285     }
286
287     protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
288         final CompletionStage<OwnerSupervisorReply> ask =
289                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
290                         ActivateDataCenter::new,
291                         Duration.ofSeconds(20),
292                         clusterNode.actorSystem.scheduler());
293         return ask.toCompletableFuture();
294     }
295
296     protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
297         final CompletionStage<OwnerSupervisorReply> ask =
298                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
299                         DeactivateDataCenter::new,
300                         Duration.ofSeconds(20),
301                         clusterNode.actorSystem.scheduler());
302         return ask.toCompletableFuture();
303     }
304
305     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
306                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
307         await().until(() -> !listener.getChanges().isEmpty());
308
309         await().untilAsserted(() -> {
310             final List<DOMEntityOwnershipChange> changes = listener.getChanges();
311             final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
312             assertEquals(entity, domEntityOwnershipChange.getEntity());
313
314             assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
315             assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
316             assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
317         });
318     }
319
320     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
321         verifyNoNotifications(listener, 2);
322     }
323
324     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
325         await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
326     }
327
328     protected static void verifyNoAdditionalNotifications(
329             final MockEntityOwnershipListener listener, final long delaySeconds) {
330         listener.resetListener();
331         verifyNoNotifications(listener, delaySeconds);
332     }
333
334     protected static final class ClusterNode {
335         private final int port;
336         private final List<String> roles;
337         private final akka.actor.typed.ActorSystem<Void> actorSystem;
338         private final ActorRef<BootstrapCommand> eosBootstrap;
339         private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
340         private final ActorRef<CandidateRegistryCommand> candidateRegistry;
341         private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
342
343         private ClusterNode(final int port,
344                             final List<String> roles,
345                             final ActorSystem actorSystem,
346                             final ActorRef<BootstrapCommand> eosBootstrap,
347                             final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
348                             final ActorRef<CandidateRegistryCommand> candidateRegistry,
349                             final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
350             this.port = port;
351             this.roles = roles;
352             this.actorSystem = Adapter.toTyped(actorSystem);
353             this.eosBootstrap = eosBootstrap;
354             this.listenerRegistry = listenerRegistry;
355             this.candidateRegistry = candidateRegistry;
356             this.ownerSupervisor = ownerSupervisor;
357         }
358
359         public int getPort() {
360             return port;
361         }
362
363         public akka.actor.typed.ActorSystem<Void> getActorSystem() {
364             return actorSystem;
365         }
366
367         public ActorRef<BootstrapCommand> getEosBootstrap() {
368             return eosBootstrap;
369         }
370
371         public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
372             return listenerRegistry;
373         }
374
375         public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
376             return candidateRegistry;
377         }
378
379         public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
380             return ownerSupervisor;
381         }
382
383         public List<String> getRoles() {
384             return roles;
385         }
386     }
387
388     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
389
390         private final Logger log;
391
392         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
393         private final String member;
394
395         public MockEntityOwnershipListener(final String member) {
396             log = LoggerFactory.getLogger("EOS-listener-" + member);
397             this.member = member;
398         }
399
400         @Override
401         public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
402             log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
403             log.info("{} changes: {}", member, changes.size());
404             changes.add(ownershipChange);
405         }
406
407         public List<DOMEntityOwnershipChange> getChanges() {
408             return changes;
409         }
410
411         public void resetListener() {
412             changes.clear();
413         }
414     }
415
416     protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
417         private final ActorSystem classicActorSystem;
418
419         protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
420                 throws ExecutionException, InterruptedException {
421             super(classicActorSystem, CODEC_CONTEXT);
422             this.classicActorSystem = classicActorSystem;
423         }
424
425         protected void reachableMember(final String... role) {
426             AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
427         }
428
429         public void unreachableMember(final String... role) {
430             AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
431         }
432
433         public ActorSystem getActorSystem() {
434             return classicActorSystem;
435         }
436     }
437 }