Add cluster-admin api for datacenter activation
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.time.Duration;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.function.Supplier;
37 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
38 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
39 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
47 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
48 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
49 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
50 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
51 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
52 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
53 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
54 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public abstract class AbstractNativeEosTest {
59
60     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
61     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
62
63     protected static final String DEFAULT_DATACENTER = "dc-default";
64
65     protected static final List<String> TWO_NODE_SEED_NODES =
66             List.of("akka://ClusterSystem@127.0.0.1:2550",
67                     "akka://ClusterSystem@127.0.0.1:2551");
68
69     protected static final List<String> THREE_NODE_SEED_NODES =
70             List.of("akka://ClusterSystem@127.0.0.1:2550",
71                     "akka://ClusterSystem@127.0.0.1:2551",
72                     "akka://ClusterSystem@127.0.0.1:2552");
73
74     protected static final List<String> DATACENTER_SEED_NODES =
75             List.of("akka://ClusterSystem@127.0.0.1:2550",
76                     "akka://ClusterSystem@127.0.0.1:2551",
77                     "akka://ClusterSystem@127.0.0.1:2552",
78                     "akka://ClusterSystem@127.0.0.1:2553");
79
80     private static final String REMOTE_PROTOCOL = "akka";
81     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
82     private static final String ROLE_PARAM = "akka.cluster.roles";
83     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
84     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
85
86
87     protected static ClusterNode startupRemote(final int port, final List<String> roles)
88             throws ExecutionException, InterruptedException {
89         return startup(port, roles, THREE_NODE_SEED_NODES);
90     }
91
92     protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
93             throws ExecutionException, InterruptedException {
94         return startup(port, roles, seedNodes);
95     }
96
97     protected static ClusterNode startup(final int port, final List<String> roles)
98             throws ExecutionException, InterruptedException {
99         return startup(port, roles, List.of());
100     }
101
102     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
103             throws ExecutionException, InterruptedException {
104
105         return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
106     }
107
108     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
109                                          final Supplier<Behavior<BootstrapCommand>> bootstrap)
110             throws ExecutionException, InterruptedException {
111         // Override the configuration
112         final Map<String, Object> overrides = new HashMap<>(4);
113         overrides.put(PORT_PARAM, port);
114         overrides.put(ROLE_PARAM, roles);
115         if (!seedNodes.isEmpty()) {
116             overrides.put(SEED_NODES_PARAM, seedNodes);
117         }
118
119         final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
120
121         // Create a classic Akka system since thats what we will have in osgi
122         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
123         final ActorRef<BootstrapCommand> eosBootstrap =
124                 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
125
126         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
127                 GetRunningContext::new,
128                 Duration.ofSeconds(5),
129                 Adapter.toTyped(system.scheduler()));
130         final RunningContext runningContext = ask.toCompletableFuture().get();
131
132         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
133                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
134     }
135
136     protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
137                                                        final List<String> seedNodes, final String dataCenter)
138             throws ExecutionException, InterruptedException {
139         final Map<String, Object> overrides = new HashMap<>();
140         overrides.put(PORT_PARAM, port);
141         overrides.put(ROLE_PARAM, roles);
142         if (!seedNodes.isEmpty()) {
143             overrides.put(SEED_NODES_PARAM, seedNodes);
144         }
145         overrides.put(DATA_CENTER_PARAM, dataCenter);
146
147         final Config config = ConfigFactory.parseMap(overrides)
148                 .withFallback(ConfigFactory.load());
149
150         // Create a classic Akka system since thats what we will have in osgi
151         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
152         final ActorRef<BootstrapCommand> eosBootstrap =
153                 Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
154
155         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
156                 GetRunningContext::new,
157                 Duration.ofSeconds(5),
158                 Adapter.toTyped(system.scheduler()));
159         final RunningContext runningContext = ask.toCompletableFuture().get();
160
161         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
162                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
163     }
164
165     private static Behavior<BootstrapCommand> rootBehavior() {
166         return Behaviors.setup(context -> EOSMain.create());
167     }
168
169     protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
170         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
171         registerCandidates(candidateRegistry, entity, members);
172     }
173
174     protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
175                                              final DOMEntity entity, final String... members) {
176         for (final String member : members) {
177             candidateRegistry.tell(new RegisterCandidate(entity, member));
178         }
179     }
180
181     protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
182                                                final String... members) {
183         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
184         for (final String member : members) {
185             candidateRegistry.tell(new UnregisterCandidate(entity, member));
186         }
187     }
188
189     protected static  MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
190         final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
191         final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
192         listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
193
194         return listener;
195     }
196
197     protected static void reachableMember(final ClusterNode node, final String... role) {
198         reachableMember(node.getOwnerSupervisor(), role);
199     }
200
201     protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
202                                           final String... role) {
203         ownerSupervisor.tell(new MemberReachableEvent(
204                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
205     }
206
207     protected static void unreachableMember(final ClusterNode node, final String... role) {
208         unreachableMember(node.getOwnerSupervisor(), role);
209     }
210
211     protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
212                                             final String... role) {
213         ownerSupervisor.tell(new MemberUnreachableEvent(
214                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
215     }
216
217     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
218         await().until(() -> {
219             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
220             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
221                     AskPattern.ask(distributedData.replicator(),
222                             replyTo -> new Replicator.Get<>(
223                                     new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
224                             Duration.ofSeconds(5),
225                             clusterNode.getActorSystem().scheduler());
226
227             final Replicator.GetResponse<LWWRegister<String>> response =
228                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
229
230             if (response instanceof Replicator.GetSuccess) {
231                 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
232                 return !owner.isEmpty();
233             }
234
235             return false;
236         });
237     }
238
239     protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
240         final CompletionStage<OwnerSupervisorReply> ask =
241                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
242                         ActivateDataCenter::new,
243                         Duration.ofSeconds(20),
244                         clusterNode.actorSystem.scheduler());
245         return ask.toCompletableFuture();
246     }
247
248     protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
249         final CompletionStage<OwnerSupervisorReply> ask =
250                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
251                         DeactivateDataCenter::new,
252                         Duration.ofSeconds(20),
253                         clusterNode.actorSystem.scheduler());
254         return ask.toCompletableFuture();
255     }
256
257     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
258                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
259         await().until(() -> !listener.getChanges().isEmpty());
260
261         await().untilAsserted(() -> {
262             final List<DOMEntityOwnershipChange> changes = listener.getChanges();
263             final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
264             assertEquals(entity, domEntityOwnershipChange.getEntity());
265
266             assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
267             assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
268             assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
269         });
270     }
271
272     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
273         await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
274     }
275
276     protected static final class ClusterNode {
277         private final int port;
278         private final List<String> roles;
279         private final akka.actor.typed.ActorSystem<Void> actorSystem;
280         private final ActorRef<BootstrapCommand> eosBootstrap;
281         private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
282         private final ActorRef<CandidateRegistryCommand> candidateRegistry;
283         private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
284
285         private ClusterNode(final int port,
286                             final List<String> roles,
287                             final ActorSystem actorSystem,
288                             final ActorRef<BootstrapCommand> eosBootstrap,
289                             final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
290                             final ActorRef<CandidateRegistryCommand> candidateRegistry,
291                             final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
292             this.port = port;
293             this.roles = roles;
294             this.actorSystem = Adapter.toTyped(actorSystem);
295             this.eosBootstrap = eosBootstrap;
296             this.listenerRegistry = listenerRegistry;
297             this.candidateRegistry = candidateRegistry;
298             this.ownerSupervisor = ownerSupervisor;
299         }
300
301         public int getPort() {
302             return port;
303         }
304
305         public akka.actor.typed.ActorSystem<Void> getActorSystem() {
306             return actorSystem;
307         }
308
309         public ActorRef<BootstrapCommand> getEosBootstrap() {
310             return eosBootstrap;
311         }
312
313         public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
314             return listenerRegistry;
315         }
316
317         public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
318             return candidateRegistry;
319         }
320
321         public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
322             return ownerSupervisor;
323         }
324
325         public List<String> getRoles() {
326             return roles;
327         }
328     }
329
330     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
331
332         private final Logger log;
333
334         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
335         private final String member;
336
337         public MockEntityOwnershipListener(final String member) {
338             log = LoggerFactory.getLogger("EOS-listener-" + member);
339             this.member = member;
340         }
341
342         @Override
343         public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
344             log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
345             log.info("{} changes: {}", member, changes.size());
346             changes.add(ownershipChange);
347         }
348
349         public List<DOMEntityOwnershipChange> getChanges() {
350             return changes;
351         }
352     }
353 }