Add integration test with cluster-singleton
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.time.Duration;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.function.Supplier;
37 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
38 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
39 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
47 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
48 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
49 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
50 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
51 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
52 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
53 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
54 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public abstract class AbstractNativeEosTest {
59
60     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
61     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
62
63     protected static final String DEFAULT_DATACENTER = "dc-default";
64
65     protected static final List<String> TWO_NODE_SEED_NODES =
66             List.of("akka://ClusterSystem@127.0.0.1:2550",
67                     "akka://ClusterSystem@127.0.0.1:2551");
68
69     protected static final List<String> THREE_NODE_SEED_NODES =
70             List.of("akka://ClusterSystem@127.0.0.1:2550",
71                     "akka://ClusterSystem@127.0.0.1:2551",
72                     "akka://ClusterSystem@127.0.0.1:2552");
73
74     protected static final List<String> DATACENTER_SEED_NODES =
75             List.of("akka://ClusterSystem@127.0.0.1:2550",
76                     "akka://ClusterSystem@127.0.0.1:2551",
77                     "akka://ClusterSystem@127.0.0.1:2552",
78                     "akka://ClusterSystem@127.0.0.1:2553");
79
80     private static final String REMOTE_PROTOCOL = "akka";
81     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
82     private static final String ROLE_PARAM = "akka.cluster.roles";
83     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
84     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
85
86     protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
87                                                                            final List<String> seedNodes)
88             throws ExecutionException, InterruptedException {
89         final Map<String, Object> overrides = new HashMap<>();
90         overrides.put(PORT_PARAM, port);
91         overrides.put(ROLE_PARAM, roles);
92         if (!seedNodes.isEmpty()) {
93             overrides.put(SEED_NODES_PARAM, seedNodes);
94         }
95
96         final Config config = ConfigFactory.parseMap(overrides)
97                 .withFallback(ConfigFactory.load());
98
99         // Create a classic Akka system since thats what we will have in osgi
100         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
101
102         return new MockNativeEntityOwnershipService(system);
103     }
104
105     protected static ClusterNode startupRemote(final int port, final List<String> roles)
106             throws ExecutionException, InterruptedException {
107         return startup(port, roles, THREE_NODE_SEED_NODES);
108     }
109
110     protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
111             throws ExecutionException, InterruptedException {
112         return startup(port, roles, seedNodes);
113     }
114
115     protected static ClusterNode startup(final int port, final List<String> roles)
116             throws ExecutionException, InterruptedException {
117         return startup(port, roles, List.of());
118     }
119
120     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
121             throws ExecutionException, InterruptedException {
122
123         return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
124     }
125
126     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
127                                          final Supplier<Behavior<BootstrapCommand>> bootstrap)
128             throws ExecutionException, InterruptedException {
129         // Override the configuration
130         final Map<String, Object> overrides = new HashMap<>(4);
131         overrides.put(PORT_PARAM, port);
132         overrides.put(ROLE_PARAM, roles);
133         if (!seedNodes.isEmpty()) {
134             overrides.put(SEED_NODES_PARAM, seedNodes);
135         }
136
137         final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
138
139         // Create a classic Akka system since thats what we will have in osgi
140         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
141         final ActorRef<BootstrapCommand> eosBootstrap =
142                 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
143
144         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
145                 GetRunningContext::new,
146                 Duration.ofSeconds(5),
147                 Adapter.toTyped(system.scheduler()));
148         final RunningContext runningContext = ask.toCompletableFuture().get();
149
150         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
151                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
152     }
153
154     protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
155                                                        final List<String> seedNodes, final String dataCenter)
156             throws ExecutionException, InterruptedException {
157         final Map<String, Object> overrides = new HashMap<>();
158         overrides.put(PORT_PARAM, port);
159         overrides.put(ROLE_PARAM, roles);
160         if (!seedNodes.isEmpty()) {
161             overrides.put(SEED_NODES_PARAM, seedNodes);
162         }
163         overrides.put(DATA_CENTER_PARAM, dataCenter);
164
165         final Config config = ConfigFactory.parseMap(overrides)
166                 .withFallback(ConfigFactory.load());
167
168         // Create a classic Akka system since thats what we will have in osgi
169         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
170         final ActorRef<BootstrapCommand> eosBootstrap =
171                 Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
172
173         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
174                 GetRunningContext::new,
175                 Duration.ofSeconds(5),
176                 Adapter.toTyped(system.scheduler()));
177         final RunningContext runningContext = ask.toCompletableFuture().get();
178
179         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
180                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
181     }
182
183     private static Behavior<BootstrapCommand> rootBehavior() {
184         return Behaviors.setup(context -> EOSMain.create());
185     }
186
187     protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
188         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
189         registerCandidates(candidateRegistry, entity, members);
190     }
191
192     protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
193                                              final DOMEntity entity, final String... members) {
194         for (final String member : members) {
195             candidateRegistry.tell(new RegisterCandidate(entity, member));
196         }
197     }
198
199     protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
200                                                final String... members) {
201         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
202         for (final String member : members) {
203             candidateRegistry.tell(new UnregisterCandidate(entity, member));
204         }
205     }
206
207     protected static  MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
208         final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
209         final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
210         listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
211
212         return listener;
213     }
214
215     protected static void reachableMember(final ClusterNode node, final String... role) {
216         reachableMember(node.getOwnerSupervisor(), role);
217     }
218
219     protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
220                                           final String... role) {
221         ownerSupervisor.tell(new MemberReachableEvent(
222                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
223     }
224
225     protected static void unreachableMember(final ClusterNode node, final String... role) {
226         unreachableMember(node.getOwnerSupervisor(), role);
227     }
228
229     protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
230                                             final String... role) {
231         ownerSupervisor.tell(new MemberUnreachableEvent(
232                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
233     }
234
235     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
236         await().until(() -> {
237             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
238             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
239                     AskPattern.ask(distributedData.replicator(),
240                             replyTo -> new Replicator.Get<>(
241                                     new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
242                             Duration.ofSeconds(5),
243                             clusterNode.getActorSystem().scheduler());
244
245             final Replicator.GetResponse<LWWRegister<String>> response =
246                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
247
248             if (response instanceof Replicator.GetSuccess) {
249                 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
250                 return !owner.isEmpty();
251             }
252
253             return false;
254         });
255     }
256
257     protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
258         final CompletionStage<OwnerSupervisorReply> ask =
259                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
260                         ActivateDataCenter::new,
261                         Duration.ofSeconds(20),
262                         clusterNode.actorSystem.scheduler());
263         return ask.toCompletableFuture();
264     }
265
266     protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
267         final CompletionStage<OwnerSupervisorReply> ask =
268                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
269                         DeactivateDataCenter::new,
270                         Duration.ofSeconds(20),
271                         clusterNode.actorSystem.scheduler());
272         return ask.toCompletableFuture();
273     }
274
275     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
276                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
277         await().until(() -> !listener.getChanges().isEmpty());
278
279         await().untilAsserted(() -> {
280             final List<DOMEntityOwnershipChange> changes = listener.getChanges();
281             final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
282             assertEquals(entity, domEntityOwnershipChange.getEntity());
283
284             assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
285             assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
286             assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
287         });
288     }
289
290     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
291         await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
292     }
293
294     protected static final class ClusterNode {
295         private final int port;
296         private final List<String> roles;
297         private final akka.actor.typed.ActorSystem<Void> actorSystem;
298         private final ActorRef<BootstrapCommand> eosBootstrap;
299         private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
300         private final ActorRef<CandidateRegistryCommand> candidateRegistry;
301         private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
302
303         private ClusterNode(final int port,
304                             final List<String> roles,
305                             final ActorSystem actorSystem,
306                             final ActorRef<BootstrapCommand> eosBootstrap,
307                             final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
308                             final ActorRef<CandidateRegistryCommand> candidateRegistry,
309                             final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
310             this.port = port;
311             this.roles = roles;
312             this.actorSystem = Adapter.toTyped(actorSystem);
313             this.eosBootstrap = eosBootstrap;
314             this.listenerRegistry = listenerRegistry;
315             this.candidateRegistry = candidateRegistry;
316             this.ownerSupervisor = ownerSupervisor;
317         }
318
319         public int getPort() {
320             return port;
321         }
322
323         public akka.actor.typed.ActorSystem<Void> getActorSystem() {
324             return actorSystem;
325         }
326
327         public ActorRef<BootstrapCommand> getEosBootstrap() {
328             return eosBootstrap;
329         }
330
331         public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
332             return listenerRegistry;
333         }
334
335         public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
336             return candidateRegistry;
337         }
338
339         public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
340             return ownerSupervisor;
341         }
342
343         public List<String> getRoles() {
344             return roles;
345         }
346     }
347
348     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
349
350         private final Logger log;
351
352         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
353         private final String member;
354
355         public MockEntityOwnershipListener(final String member) {
356             log = LoggerFactory.getLogger("EOS-listener-" + member);
357             this.member = member;
358         }
359
360         @Override
361         public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
362             log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
363             log.info("{} changes: {}", member, changes.size());
364             changes.add(ownershipChange);
365         }
366
367         public List<DOMEntityOwnershipChange> getChanges() {
368             return changes;
369         }
370     }
371
372     protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
373         private ActorSystem classicActorSystem;
374
375         protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
376                 throws ExecutionException, InterruptedException {
377             super(classicActorSystem);
378             this.classicActorSystem = classicActorSystem;
379         }
380
381         protected void reachableMember(final String... role) {
382             AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
383         }
384
385         public void unreachableMember(final String... role) {
386             AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
387         }
388
389         public ActorSystem getActorSystem() {
390             return classicActorSystem;
391         }
392     }
393 }