Move {Identifiable,Persistent,}Payload
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.ORMap;
23 import akka.cluster.ddata.ORSet;
24 import akka.cluster.ddata.typed.javadsl.DistributedData;
25 import akka.cluster.ddata.typed.javadsl.Replicator;
26 import com.typesafe.config.Config;
27 import com.typesafe.config.ConfigFactory;
28 import java.time.Duration;
29 import java.util.ArrayList;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionStage;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Supplier;
39 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
41 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
42 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
49 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
50 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
51 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
52 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
53 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
54 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
55 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
56 import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
57 import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
58 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
59 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
60 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
61 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 public abstract class AbstractNativeEosTest {
66
67     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
68     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
69
70     protected static final String DEFAULT_DATACENTER = "dc-default";
71
72     protected static final List<String> TWO_NODE_SEED_NODES =
73             List.of("akka://ClusterSystem@127.0.0.1:2550",
74                     "akka://ClusterSystem@127.0.0.1:2551");
75
76     protected static final List<String> THREE_NODE_SEED_NODES =
77             List.of("akka://ClusterSystem@127.0.0.1:2550",
78                     "akka://ClusterSystem@127.0.0.1:2551",
79                     "akka://ClusterSystem@127.0.0.1:2552");
80
81     protected static final List<String> DATACENTER_SEED_NODES =
82             List.of("akka://ClusterSystem@127.0.0.1:2550",
83                     "akka://ClusterSystem@127.0.0.1:2551",
84                     "akka://ClusterSystem@127.0.0.1:2552",
85                     "akka://ClusterSystem@127.0.0.1:2553");
86
87     private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
88
89     protected static BindingCodecContext CODEC_CONTEXT
90             = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
91
92     private static final String REMOTE_PROTOCOL = "akka";
93     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
94     private static final String ROLE_PARAM = "akka.cluster.roles";
95     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
96     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
97
98     protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
99                                                                            final List<String> seedNodes)
100             throws ExecutionException, InterruptedException {
101         final Map<String, Object> overrides = new HashMap<>();
102         overrides.put(PORT_PARAM, port);
103         overrides.put(ROLE_PARAM, roles);
104         if (!seedNodes.isEmpty()) {
105             overrides.put(SEED_NODES_PARAM, seedNodes);
106         }
107
108         final Config config = ConfigFactory.parseMap(overrides)
109                 .withFallback(ConfigFactory.load());
110
111         // Create a classic Akka system since thats what we will have in osgi
112         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
113
114         return new MockNativeEntityOwnershipService(system);
115     }
116
117     protected static ClusterNode startupRemote(final int port, final List<String> roles)
118             throws ExecutionException, InterruptedException {
119         return startup(port, roles, THREE_NODE_SEED_NODES);
120     }
121
122     protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
123             throws ExecutionException, InterruptedException {
124         return startup(port, roles, seedNodes);
125     }
126
127     protected static ClusterNode startup(final int port, final List<String> roles)
128             throws ExecutionException, InterruptedException {
129         return startup(port, roles, List.of());
130     }
131
132     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
133             throws ExecutionException, InterruptedException {
134
135         return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
136     }
137
138     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
139                                          final Supplier<Behavior<BootstrapCommand>> bootstrap)
140             throws ExecutionException, InterruptedException {
141         // Override the configuration
142         final Map<String, Object> overrides = new HashMap<>(4);
143         overrides.put(PORT_PARAM, port);
144         overrides.put(ROLE_PARAM, roles);
145         if (!seedNodes.isEmpty()) {
146             overrides.put(SEED_NODES_PARAM, seedNodes);
147         }
148
149         final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
150
151         // Create a classic Akka system since thats what we will have in osgi
152         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
153         final ActorRef<BootstrapCommand> eosBootstrap =
154                 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
155
156         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
157                 GetRunningContext::new,
158                 Duration.ofSeconds(5),
159                 Adapter.toTyped(system.scheduler()));
160         final RunningContext runningContext = ask.toCompletableFuture().get();
161
162         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
163                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
164     }
165
166     protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
167                                                        final List<String> seedNodes, final String dataCenter)
168             throws ExecutionException, InterruptedException {
169         final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
170         final ActorRef<BootstrapCommand> eosBootstrap =
171                 Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
172
173         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
174                 GetRunningContext::new,
175                 Duration.ofSeconds(5),
176                 Adapter.toTyped(system.scheduler()));
177         final RunningContext runningContext = ask.toCompletableFuture().get();
178
179         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
180                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
181     }
182
183     protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
184                                                                final List<String> seedNodes) {
185         final Map<String, Object> overrides = new HashMap<>();
186         overrides.put(PORT_PARAM, port);
187         overrides.put(ROLE_PARAM, roles);
188         if (!seedNodes.isEmpty()) {
189             overrides.put(SEED_NODES_PARAM, seedNodes);
190         }
191
192         final Config config = ConfigFactory.parseMap(overrides)
193                 .withFallback(ConfigFactory.load());
194
195         // Create a classic Akka system since thats what we will have in osgi
196         return akka.actor.ActorSystem.create("ClusterSystem", config);
197     }
198
199     protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
200                                                                final List<String> seedNodes, final String dataCenter) {
201         final Map<String, Object> overrides = new HashMap<>();
202         overrides.put(PORT_PARAM, port);
203         overrides.put(ROLE_PARAM, roles);
204         if (!seedNodes.isEmpty()) {
205             overrides.put(SEED_NODES_PARAM, seedNodes);
206         }
207         overrides.put(DATA_CENTER_PARAM, dataCenter);
208
209         final Config config = ConfigFactory.parseMap(overrides)
210                 .withFallback(ConfigFactory.load());
211
212         // Create a classic Akka system since thats what we will have in osgi
213         return akka.actor.ActorSystem.create("ClusterSystem", config);
214     }
215
216     private static Behavior<BootstrapCommand> rootBehavior() {
217         return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
218     }
219
220     protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
221         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
222         registerCandidates(candidateRegistry, entity, members);
223     }
224
225     protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
226                                              final DOMEntity entity, final String... members) {
227         for (final String member : members) {
228             candidateRegistry.tell(new RegisterCandidate(entity, member));
229         }
230     }
231
232     protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
233                                                final String... members) {
234         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
235         for (final String member : members) {
236             candidateRegistry.tell(new UnregisterCandidate(entity, member));
237         }
238     }
239
240     protected static  MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
241         final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
242         final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
243         listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
244
245         return listener;
246     }
247
248     protected static void reachableMember(final ClusterNode node, final String... role) {
249         reachableMember(node.getOwnerSupervisor(), role);
250     }
251
252     protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
253                                           final String... role) {
254         ownerSupervisor.tell(new MemberReachableEvent(
255                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
256     }
257
258     protected static void unreachableMember(final ClusterNode node, final String... role) {
259         unreachableMember(node.getOwnerSupervisor(), role);
260     }
261
262     protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
263                                             final String... role) {
264         ownerSupervisor.tell(new MemberUnreachableEvent(
265                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
266     }
267
268     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
269         await().atMost(Duration.ofSeconds(15)).until(() -> {
270             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
271             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
272                     AskPattern.ask(distributedData.replicator(),
273                             replyTo -> new Replicator.Get<>(
274                                     new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
275                             Duration.ofSeconds(5),
276                             clusterNode.getActorSystem().scheduler());
277
278             final Replicator.GetResponse<LWWRegister<String>> response =
279                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
280
281             if (response instanceof Replicator.GetSuccess) {
282                 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
283                 return !owner.isEmpty();
284             }
285
286             return false;
287         });
288     }
289
290     protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity,
291                                                      final String candidate) {
292         await().atMost(Duration.ofSeconds(15)).until(() -> {
293             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
294
295             final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
296                     AskPattern.ask(distributedData.replicator(),
297                             replyTo -> new Replicator.Get<>(
298                                     CandidateRegistry.KEY, Replicator.readLocal(), replyTo),
299                             Duration.ofSeconds(5),
300                             clusterNode.getActorSystem().scheduler());
301
302             final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response =
303                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
304
305             if (response instanceof Replicator.GetSuccess) {
306                 final Map<DOMEntity, ORSet<String>> entries =
307                         ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response).dataValue().getEntries();
308
309                 return entries.get(entity).contains(candidate);
310
311             }
312             return false;
313         });
314     }
315
316     protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
317         final CompletionStage<OwnerSupervisorReply> ask =
318                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
319                         ActivateDataCenter::new,
320                         Duration.ofSeconds(20),
321                         clusterNode.actorSystem.scheduler());
322         return ask.toCompletableFuture();
323     }
324
325     protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
326         final CompletionStage<OwnerSupervisorReply> ask =
327                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
328                         DeactivateDataCenter::new,
329                         Duration.ofSeconds(20),
330                         clusterNode.actorSystem.scheduler());
331         return ask.toCompletableFuture();
332     }
333
334     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
335                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
336         await().until(() -> !listener.getChanges().isEmpty());
337
338         await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
339             final List<DOMEntityOwnershipChange> changes = listener.getChanges();
340             final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
341             assertEquals(entity, domEntityOwnershipChange.getEntity());
342
343             assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
344             assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
345             assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
346         });
347     }
348
349     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
350         verifyNoNotifications(listener, 2);
351     }
352
353     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
354         await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
355     }
356
357     protected static void verifyNoAdditionalNotifications(
358             final MockEntityOwnershipListener listener, final long delaySeconds) {
359         listener.resetListener();
360         verifyNoNotifications(listener, delaySeconds);
361     }
362
363     protected static final class ClusterNode {
364         private final int port;
365         private final List<String> roles;
366         private final akka.actor.typed.ActorSystem<Void> actorSystem;
367         private final ActorRef<BootstrapCommand> eosBootstrap;
368         private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
369         private final ActorRef<CandidateRegistryCommand> candidateRegistry;
370         private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
371
372         private ClusterNode(final int port,
373                             final List<String> roles,
374                             final ActorSystem actorSystem,
375                             final ActorRef<BootstrapCommand> eosBootstrap,
376                             final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
377                             final ActorRef<CandidateRegistryCommand> candidateRegistry,
378                             final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
379             this.port = port;
380             this.roles = roles;
381             this.actorSystem = Adapter.toTyped(actorSystem);
382             this.eosBootstrap = eosBootstrap;
383             this.listenerRegistry = listenerRegistry;
384             this.candidateRegistry = candidateRegistry;
385             this.ownerSupervisor = ownerSupervisor;
386         }
387
388         public int getPort() {
389             return port;
390         }
391
392         public akka.actor.typed.ActorSystem<Void> getActorSystem() {
393             return actorSystem;
394         }
395
396         public ActorRef<BootstrapCommand> getEosBootstrap() {
397             return eosBootstrap;
398         }
399
400         public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
401             return listenerRegistry;
402         }
403
404         public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
405             return candidateRegistry;
406         }
407
408         public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
409             return ownerSupervisor;
410         }
411
412         public List<String> getRoles() {
413             return roles;
414         }
415     }
416
417     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
418
419         private final Logger log;
420
421         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
422         private final String member;
423
424         public MockEntityOwnershipListener(final String member) {
425             log = LoggerFactory.getLogger("EOS-listener-" + member);
426             this.member = member;
427         }
428
429         @Override
430         public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
431             log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
432             log.info("{} changes: {}", member, changes.size());
433             changes.add(ownershipChange);
434         }
435
436         public List<DOMEntityOwnershipChange> getChanges() {
437             return changes;
438         }
439
440         public void resetListener() {
441             changes.clear();
442         }
443     }
444
445     protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
446         private final ActorSystem classicActorSystem;
447
448         protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
449                 throws ExecutionException, InterruptedException {
450             super(classicActorSystem, CODEC_CONTEXT);
451             this.classicActorSystem = classicActorSystem;
452         }
453
454         protected void reachableMember(final String... role) {
455             AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
456         }
457
458         public void unreachableMember(final String... role) {
459             AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
460         }
461
462         public ActorSystem getActorSystem() {
463             return classicActorSystem;
464         }
465     }
466 }