Use akka-multi-dc in eos native
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.time.Duration;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.CompletionStage;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Supplier;
36 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
37 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
38 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
39 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
40 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
45 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
46 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
47 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
48 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
49 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
50 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
51 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
52 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 public abstract class AbstractNativeEosTest {
57
58     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
59     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
60
61     protected static final String DEFAULT_DATACENTER = "dc-default";
62
63     protected static final List<String> TWO_NODE_SEED_NODES =
64             List.of("akka://ClusterSystem@127.0.0.1:2550",
65                     "akka://ClusterSystem@127.0.0.1:2551");
66
67     protected static final List<String> THREE_NODE_SEED_NODES =
68             List.of("akka://ClusterSystem@127.0.0.1:2550",
69                     "akka://ClusterSystem@127.0.0.1:2551",
70                     "akka://ClusterSystem@127.0.0.1:2552");
71
72     protected static final List<String> DATACENTER_SEED_NODES =
73             List.of("akka://ClusterSystem@127.0.0.1:2550",
74                     "akka://ClusterSystem@127.0.0.1:2551",
75                     "akka://ClusterSystem@127.0.0.1:2552",
76                     "akka://ClusterSystem@127.0.0.1:2553");
77
78     private static final String REMOTE_PROTOCOL = "akka";
79     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
80     private static final String ROLE_PARAM = "akka.cluster.roles";
81     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
82     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
83
84
85     protected static ClusterNode startupRemote(final int port, final List<String> roles)
86             throws ExecutionException, InterruptedException {
87         return startup(port, roles, THREE_NODE_SEED_NODES);
88     }
89
90     protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
91             throws ExecutionException, InterruptedException {
92         return startup(port, roles, seedNodes);
93     }
94
95     protected static ClusterNode startup(final int port, final List<String> roles)
96             throws ExecutionException, InterruptedException {
97         return startup(port, roles, List.of());
98     }
99
100     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
101             throws ExecutionException, InterruptedException {
102
103         return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
104     }
105
106     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
107                                          final Supplier<Behavior<BootstrapCommand>> bootstrap)
108             throws ExecutionException, InterruptedException {
109         // Override the configuration
110         final Map<String, Object> overrides = new HashMap<>(4);
111         overrides.put(PORT_PARAM, port);
112         overrides.put(ROLE_PARAM, roles);
113         if (!seedNodes.isEmpty()) {
114             overrides.put(SEED_NODES_PARAM, seedNodes);
115         }
116
117         final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
118
119         // Create a classic Akka system since thats what we will have in osgi
120         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
121         final ActorRef<BootstrapCommand> eosBootstrap =
122                 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
123
124         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
125                 GetRunningContext::new,
126                 Duration.ofSeconds(5),
127                 Adapter.toTyped(system.scheduler()));
128         final RunningContext runningContext = ask.toCompletableFuture().get();
129
130         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
131                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
132     }
133
134     protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
135                                                        final List<String> seedNodes, final String dataCenter)
136             throws ExecutionException, InterruptedException {
137         final Map<String, Object> overrides = new HashMap<>();
138         overrides.put(PORT_PARAM, port);
139         overrides.put(ROLE_PARAM, roles);
140         if (!seedNodes.isEmpty()) {
141             overrides.put(SEED_NODES_PARAM, seedNodes);
142         }
143         overrides.put(DATA_CENTER_PARAM, dataCenter);
144
145         final Config config = ConfigFactory.parseMap(overrides)
146                 .withFallback(ConfigFactory.load());
147
148         // Create a classic Akka system since thats what we will have in osgi
149         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
150         final ActorRef<BootstrapCommand> eosBootstrap =
151                 Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
152
153         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
154                 GetRunningContext::new,
155                 Duration.ofSeconds(5),
156                 Adapter.toTyped(system.scheduler()));
157         final RunningContext runningContext = ask.toCompletableFuture().get();
158
159         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
160                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
161     }
162
163     private static Behavior<BootstrapCommand> rootBehavior() {
164         return Behaviors.setup(context -> EOSMain.create());
165     }
166
167     protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
168         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
169         registerCandidates(candidateRegistry, entity, members);
170     }
171
172     protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
173                                              final DOMEntity entity, final String... members) {
174         for (final String member : members) {
175             candidateRegistry.tell(new RegisterCandidate(entity, member));
176         }
177     }
178
179     protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
180                                                final String... members) {
181         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
182         for (final String member : members) {
183             candidateRegistry.tell(new UnregisterCandidate(entity, member));
184         }
185     }
186
187     protected static  MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
188         final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
189         final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
190         listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
191
192         return listener;
193     }
194
195     protected static void reachableMember(final ClusterNode node, final String... role) {
196         reachableMember(node.getOwnerSupervisor(), role);
197     }
198
199     protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
200                                           final String... role) {
201         ownerSupervisor.tell(new MemberReachableEvent(
202                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
203     }
204
205     protected static void unreachableMember(final ClusterNode node, final String... role) {
206         unreachableMember(node.getOwnerSupervisor(), role);
207     }
208
209     protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
210                                             final String... role) {
211         ownerSupervisor.tell(new MemberUnreachableEvent(
212                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
213     }
214
215     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
216         await().until(() -> {
217             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
218             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
219                     AskPattern.ask(distributedData.replicator(),
220                             replyTo -> new Replicator.Get<>(
221                                     new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
222                             Duration.ofSeconds(5),
223                             clusterNode.getActorSystem().scheduler());
224
225             final Replicator.GetResponse<LWWRegister<String>> response =
226                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
227
228             if (response instanceof Replicator.GetSuccess) {
229                 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
230                 return !owner.isEmpty();
231             }
232
233             return false;
234         });
235     }
236
237     protected static void activateDatacenter(final ClusterNode clusterNode) {
238         clusterNode.getOwnerSupervisor().tell(ActivateDataCenter.INSTANCE);
239     }
240
241     protected static void deactivateDatacenter(final ClusterNode clusterNode) {
242         clusterNode.getOwnerSupervisor().tell(DeactivateDataCenter.INSTANCE);
243     }
244
245     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
246                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
247         await().until(() -> !listener.getChanges().isEmpty());
248
249         await().untilAsserted(() -> {
250             final List<DOMEntityOwnershipChange> changes = listener.getChanges();
251             final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
252             assertEquals(entity, domEntityOwnershipChange.getEntity());
253
254             assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
255             assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
256             assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
257         });
258     }
259
260     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
261         await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
262     }
263
264     protected static final class ClusterNode {
265         private final int port;
266         private final List<String> roles;
267         private final akka.actor.typed.ActorSystem<Void> actorSystem;
268         private final ActorRef<BootstrapCommand> eosBootstrap;
269         private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
270         private final ActorRef<CandidateRegistryCommand> candidateRegistry;
271         private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
272
273         private ClusterNode(final int port,
274                             final List<String> roles,
275                             final ActorSystem actorSystem,
276                             final ActorRef<BootstrapCommand> eosBootstrap,
277                             final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
278                             final ActorRef<CandidateRegistryCommand> candidateRegistry,
279                             final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
280             this.port = port;
281             this.roles = roles;
282             this.actorSystem = Adapter.toTyped(actorSystem);
283             this.eosBootstrap = eosBootstrap;
284             this.listenerRegistry = listenerRegistry;
285             this.candidateRegistry = candidateRegistry;
286             this.ownerSupervisor = ownerSupervisor;
287         }
288
289         public int getPort() {
290             return port;
291         }
292
293         public akka.actor.typed.ActorSystem<Void> getActorSystem() {
294             return actorSystem;
295         }
296
297         public ActorRef<BootstrapCommand> getEosBootstrap() {
298             return eosBootstrap;
299         }
300
301         public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
302             return listenerRegistry;
303         }
304
305         public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
306             return candidateRegistry;
307         }
308
309         public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
310             return ownerSupervisor;
311         }
312
313         public List<String> getRoles() {
314             return roles;
315         }
316     }
317
318     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
319
320         private final Logger log;
321
322         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
323         private final String member;
324
325         public MockEntityOwnershipListener(final String member) {
326             log = LoggerFactory.getLogger("EOS-listener-" + member);
327             this.member = member;
328         }
329
330         @Override
331         public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
332             log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
333             log.info("{} changes: {}", member, changes.size());
334             changes.add(ownershipChange);
335         }
336
337         public List<DOMEntityOwnershipChange> getChanges() {
338             return changes;
339         }
340     }
341 }