6adba42c098d8e65dba74a735cc88e821884c940
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.ORMap;
23 import akka.cluster.ddata.ORSet;
24 import akka.cluster.ddata.typed.javadsl.DistributedData;
25 import akka.cluster.ddata.typed.javadsl.Replicator;
26 import com.typesafe.config.Config;
27 import com.typesafe.config.ConfigFactory;
28 import java.time.Duration;
29 import java.util.ArrayList;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionStage;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Supplier;
39 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
41 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
42 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
49 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
50 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
51 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
52 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
53 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
54 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
55 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
56 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
57 import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
58 import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
59 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
60 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
61 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
62 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public abstract class AbstractNativeEosTest {
67
68     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
69     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
70
71     protected static final String DEFAULT_DATACENTER = "dc-default";
72
73     protected static final List<String> TWO_NODE_SEED_NODES =
74             List.of("akka://ClusterSystem@127.0.0.1:2550",
75                     "akka://ClusterSystem@127.0.0.1:2551");
76
77     protected static final List<String> THREE_NODE_SEED_NODES =
78             List.of("akka://ClusterSystem@127.0.0.1:2550",
79                     "akka://ClusterSystem@127.0.0.1:2551",
80                     "akka://ClusterSystem@127.0.0.1:2552");
81
82     protected static final List<String> DATACENTER_SEED_NODES =
83             List.of("akka://ClusterSystem@127.0.0.1:2550",
84                     "akka://ClusterSystem@127.0.0.1:2551",
85                     "akka://ClusterSystem@127.0.0.1:2552",
86                     "akka://ClusterSystem@127.0.0.1:2553");
87
88     private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
89
90     protected static BindingCodecContext CODEC_CONTEXT
91             = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
92
93     private static final String REMOTE_PROTOCOL = "akka";
94     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
95     private static final String ROLE_PARAM = "akka.cluster.roles";
96     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
97     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
98
99     protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
100                                                                            final List<String> seedNodes)
101             throws ExecutionException, InterruptedException {
102         final Map<String, Object> overrides = new HashMap<>();
103         overrides.put(PORT_PARAM, port);
104         overrides.put(ROLE_PARAM, roles);
105         if (!seedNodes.isEmpty()) {
106             overrides.put(SEED_NODES_PARAM, seedNodes);
107         }
108
109         final Config config = ConfigFactory.parseMap(overrides)
110                 .withFallback(ConfigFactory.load());
111
112         // Create a classic Akka system since thats what we will have in osgi
113         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
114
115         return new MockNativeEntityOwnershipService(system);
116     }
117
118     protected static ClusterNode startupRemote(final int port, final List<String> roles)
119             throws ExecutionException, InterruptedException {
120         return startup(port, roles, THREE_NODE_SEED_NODES);
121     }
122
123     protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
124             throws ExecutionException, InterruptedException {
125         return startup(port, roles, seedNodes);
126     }
127
128     protected static ClusterNode startup(final int port, final List<String> roles)
129             throws ExecutionException, InterruptedException {
130         return startup(port, roles, List.of());
131     }
132
133     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
134             throws ExecutionException, InterruptedException {
135
136         return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
137     }
138
139     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
140                                          final Supplier<Behavior<BootstrapCommand>> bootstrap)
141             throws ExecutionException, InterruptedException {
142         // Override the configuration
143         final Map<String, Object> overrides = new HashMap<>(4);
144         overrides.put(PORT_PARAM, port);
145         overrides.put(ROLE_PARAM, roles);
146         if (!seedNodes.isEmpty()) {
147             overrides.put(SEED_NODES_PARAM, seedNodes);
148         }
149
150         final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
151
152         // Create a classic Akka system since thats what we will have in osgi
153         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
154         final ActorRef<BootstrapCommand> eosBootstrap =
155                 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
156
157         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
158                 GetRunningContext::new,
159                 Duration.ofSeconds(5),
160                 Adapter.toTyped(system.scheduler()));
161         final RunningContext runningContext = ask.toCompletableFuture().get();
162
163         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
164                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
165     }
166
167     protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
168                                                        final List<String> seedNodes, final String dataCenter)
169             throws ExecutionException, InterruptedException {
170         final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
171         final ActorRef<BootstrapCommand> eosBootstrap =
172                 Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
173
174         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
175                 GetRunningContext::new,
176                 Duration.ofSeconds(5),
177                 Adapter.toTyped(system.scheduler()));
178         final RunningContext runningContext = ask.toCompletableFuture().get();
179
180         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
181                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
182     }
183
184     protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
185                                                                final List<String> seedNodes) {
186         final Map<String, Object> overrides = new HashMap<>();
187         overrides.put(PORT_PARAM, port);
188         overrides.put(ROLE_PARAM, roles);
189         if (!seedNodes.isEmpty()) {
190             overrides.put(SEED_NODES_PARAM, seedNodes);
191         }
192
193         final Config config = ConfigFactory.parseMap(overrides)
194                 .withFallback(ConfigFactory.load());
195
196         // Create a classic Akka system since thats what we will have in osgi
197         return akka.actor.ActorSystem.create("ClusterSystem", config);
198     }
199
200     protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
201                                                                final List<String> seedNodes, final String dataCenter) {
202         final Map<String, Object> overrides = new HashMap<>();
203         overrides.put(PORT_PARAM, port);
204         overrides.put(ROLE_PARAM, roles);
205         if (!seedNodes.isEmpty()) {
206             overrides.put(SEED_NODES_PARAM, seedNodes);
207         }
208         overrides.put(DATA_CENTER_PARAM, dataCenter);
209
210         final Config config = ConfigFactory.parseMap(overrides)
211                 .withFallback(ConfigFactory.load());
212
213         // Create a classic Akka system since thats what we will have in osgi
214         return akka.actor.ActorSystem.create("ClusterSystem", config);
215     }
216
217     private static Behavior<BootstrapCommand> rootBehavior() {
218         return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
219     }
220
221     protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
222         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
223         registerCandidates(candidateRegistry, entity, members);
224     }
225
226     protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
227                                              final DOMEntity entity, final String... members) {
228         for (final String member : members) {
229             candidateRegistry.tell(new RegisterCandidate(entity, member));
230         }
231     }
232
233     protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
234                                                final String... members) {
235         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
236         for (final String member : members) {
237             candidateRegistry.tell(new UnregisterCandidate(entity, member));
238         }
239     }
240
241     protected static  MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
242         final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
243         final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
244         listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
245
246         return listener;
247     }
248
249     protected static void reachableMember(final ClusterNode node, final String... role) {
250         reachableMember(node.getOwnerSupervisor(), role);
251     }
252
253     protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
254                                           final String... role) {
255         ownerSupervisor.tell(new MemberReachableEvent(
256                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
257     }
258
259     protected static void unreachableMember(final ClusterNode node, final String... role) {
260         unreachableMember(node.getOwnerSupervisor(), role);
261     }
262
263     protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
264                                             final String... role) {
265         ownerSupervisor.tell(new MemberUnreachableEvent(
266                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
267     }
268
269     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
270         await().atMost(Duration.ofSeconds(15)).until(() -> {
271             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
272             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
273                     AskPattern.ask(distributedData.replicator(),
274                             replyTo -> new Replicator.Get<>(
275                                     new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
276                             Duration.ofSeconds(5),
277                             clusterNode.getActorSystem().scheduler());
278
279             final Replicator.GetResponse<LWWRegister<String>> response =
280                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
281
282             if (response instanceof Replicator.GetSuccess) {
283                 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
284                 return !owner.isEmpty();
285             }
286
287             return false;
288         });
289     }
290
291     protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity,
292                                                      final String candidate) {
293         await().atMost(Duration.ofSeconds(15)).until(() -> {
294             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
295
296             final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
297                     AskPattern.ask(distributedData.replicator(),
298                             replyTo -> new Replicator.Get<>(
299                                     CandidateRegistry.KEY, Replicator.readLocal(), replyTo),
300                             Duration.ofSeconds(5),
301                             clusterNode.getActorSystem().scheduler());
302
303             final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response =
304                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
305
306             if (response instanceof Replicator.GetSuccess) {
307                 final Map<DOMEntity, ORSet<String>> entries =
308                         ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response).dataValue().getEntries();
309
310                 return entries.get(entity).contains(candidate);
311
312             }
313             return false;
314         });
315     }
316
317     protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
318         final CompletionStage<OwnerSupervisorReply> ask =
319                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
320                         ActivateDataCenter::new,
321                         Duration.ofSeconds(20),
322                         clusterNode.actorSystem.scheduler());
323         return ask.toCompletableFuture();
324     }
325
326     protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
327         final CompletionStage<OwnerSupervisorReply> ask =
328                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
329                         DeactivateDataCenter::new,
330                         Duration.ofSeconds(20),
331                         clusterNode.actorSystem.scheduler());
332         return ask.toCompletableFuture();
333     }
334
335     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
336                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
337         await().until(() -> !listener.getChanges().isEmpty());
338
339         await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
340             final var changes = listener.getChanges();
341             final var domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
342             assertEquals(entity, domEntityOwnershipChange.entity());
343
344             assertEquals(hasOwner, domEntityOwnershipChange.change().hasOwner());
345             assertEquals(isOwner, domEntityOwnershipChange.change().isOwner());
346             assertEquals(wasOwner, domEntityOwnershipChange.change().wasOwner());
347         });
348     }
349
350     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
351         verifyNoNotifications(listener, 2);
352     }
353
354     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
355         await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
356     }
357
358     protected static void verifyNoAdditionalNotifications(
359             final MockEntityOwnershipListener listener, final long delaySeconds) {
360         listener.resetListener();
361         verifyNoNotifications(listener, delaySeconds);
362     }
363
364     protected static final class ClusterNode {
365         private final int port;
366         private final List<String> roles;
367         private final akka.actor.typed.ActorSystem<Void> actorSystem;
368         private final ActorRef<BootstrapCommand> eosBootstrap;
369         private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
370         private final ActorRef<CandidateRegistryCommand> candidateRegistry;
371         private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
372
373         private ClusterNode(final int port,
374                             final List<String> roles,
375                             final ActorSystem actorSystem,
376                             final ActorRef<BootstrapCommand> eosBootstrap,
377                             final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
378                             final ActorRef<CandidateRegistryCommand> candidateRegistry,
379                             final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
380             this.port = port;
381             this.roles = roles;
382             this.actorSystem = Adapter.toTyped(actorSystem);
383             this.eosBootstrap = eosBootstrap;
384             this.listenerRegistry = listenerRegistry;
385             this.candidateRegistry = candidateRegistry;
386             this.ownerSupervisor = ownerSupervisor;
387         }
388
389         public int getPort() {
390             return port;
391         }
392
393         public akka.actor.typed.ActorSystem<Void> getActorSystem() {
394             return actorSystem;
395         }
396
397         public ActorRef<BootstrapCommand> getEosBootstrap() {
398             return eosBootstrap;
399         }
400
401         public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
402             return listenerRegistry;
403         }
404
405         public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
406             return candidateRegistry;
407         }
408
409         public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
410             return ownerSupervisor;
411         }
412
413         public List<String> getRoles() {
414             return roles;
415         }
416     }
417
418     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
419         private final List<EntityOwnerChanged> changes = new ArrayList<>();
420         private final String member;
421         private final Logger log;
422
423         public MockEntityOwnershipListener(final String member) {
424             log = LoggerFactory.getLogger("EOS-listener-" + member);
425             this.member = member;
426         }
427
428         @Override
429         public void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
430                 final boolean inJeopardy) {
431             final var changed = new EntityOwnerChanged(entity, change, inJeopardy);
432             log.info("{} Received ownershipCHanged: {}", member, changed);
433             log.info("{} changes: {}", member, changes.size());
434             changes.add(changed);
435         }
436
437         public List<EntityOwnerChanged> getChanges() {
438             return changes;
439         }
440
441         public void resetListener() {
442             changes.clear();
443         }
444     }
445
446     protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
447         private final ActorSystem classicActorSystem;
448
449         protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
450                 throws ExecutionException, InterruptedException {
451             super(classicActorSystem, CODEC_CONTEXT);
452             this.classicActorSystem = classicActorSystem;
453         }
454
455         protected void reachableMember(final String... role) {
456             AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
457         }
458
459         public void unreachableMember(final String... role) {
460             AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
461         }
462
463         public ActorSystem getActorSystem() {
464             return classicActorSystem;
465         }
466     }
467 }