2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka;
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.ORMap;
23 import akka.cluster.ddata.ORSet;
24 import akka.cluster.ddata.typed.javadsl.DistributedData;
25 import akka.cluster.ddata.typed.javadsl.Replicator;
26 import com.typesafe.config.Config;
27 import com.typesafe.config.ConfigFactory;
28 import java.time.Duration;
29 import java.util.ArrayList;
30 import java.util.HashMap;
31 import java.util.List;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionStage;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Supplier;
39 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
41 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
42 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
49 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
50 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
51 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
52 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
53 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
54 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
55 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
56 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
57 import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
58 import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
59 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
60 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
61 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
62 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
66 public abstract class AbstractNativeEosTest {
68 public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
69 public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
71 protected static final String DEFAULT_DATACENTER = "dc-default";
73 protected static final List<String> TWO_NODE_SEED_NODES =
74 List.of("akka://ClusterSystem@127.0.0.1:2550",
75 "akka://ClusterSystem@127.0.0.1:2551");
77 protected static final List<String> THREE_NODE_SEED_NODES =
78 List.of("akka://ClusterSystem@127.0.0.1:2550",
79 "akka://ClusterSystem@127.0.0.1:2551",
80 "akka://ClusterSystem@127.0.0.1:2552");
82 protected static final List<String> DATACENTER_SEED_NODES =
83 List.of("akka://ClusterSystem@127.0.0.1:2550",
84 "akka://ClusterSystem@127.0.0.1:2551",
85 "akka://ClusterSystem@127.0.0.1:2552",
86 "akka://ClusterSystem@127.0.0.1:2553");
88 private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
90 protected static BindingCodecContext CODEC_CONTEXT
91 = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
93 private static final String REMOTE_PROTOCOL = "akka";
94 private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
95 private static final String ROLE_PARAM = "akka.cluster.roles";
96 private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
97 private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
99 protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
100 final List<String> seedNodes)
101 throws ExecutionException, InterruptedException {
102 final Map<String, Object> overrides = new HashMap<>();
103 overrides.put(PORT_PARAM, port);
104 overrides.put(ROLE_PARAM, roles);
105 if (!seedNodes.isEmpty()) {
106 overrides.put(SEED_NODES_PARAM, seedNodes);
109 final Config config = ConfigFactory.parseMap(overrides)
110 .withFallback(ConfigFactory.load());
112 // Create a classic Akka system since thats what we will have in osgi
113 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
115 return new MockNativeEntityOwnershipService(system);
118 protected static ClusterNode startupRemote(final int port, final List<String> roles)
119 throws ExecutionException, InterruptedException {
120 return startup(port, roles, THREE_NODE_SEED_NODES);
123 protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
124 throws ExecutionException, InterruptedException {
125 return startup(port, roles, seedNodes);
128 protected static ClusterNode startup(final int port, final List<String> roles)
129 throws ExecutionException, InterruptedException {
130 return startup(port, roles, List.of());
133 protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
134 throws ExecutionException, InterruptedException {
136 return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
139 protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
140 final Supplier<Behavior<BootstrapCommand>> bootstrap)
141 throws ExecutionException, InterruptedException {
142 // Override the configuration
143 final Map<String, Object> overrides = new HashMap<>(4);
144 overrides.put(PORT_PARAM, port);
145 overrides.put(ROLE_PARAM, roles);
146 if (!seedNodes.isEmpty()) {
147 overrides.put(SEED_NODES_PARAM, seedNodes);
150 final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
152 // Create a classic Akka system since thats what we will have in osgi
153 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
154 final ActorRef<BootstrapCommand> eosBootstrap =
155 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
157 final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
158 GetRunningContext::new,
159 Duration.ofSeconds(5),
160 Adapter.toTyped(system.scheduler()));
161 final RunningContext runningContext = ask.toCompletableFuture().get();
163 return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
164 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
167 protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
168 final List<String> seedNodes, final String dataCenter)
169 throws ExecutionException, InterruptedException {
170 final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
171 final ActorRef<BootstrapCommand> eosBootstrap =
172 Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
174 final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
175 GetRunningContext::new,
176 Duration.ofSeconds(5),
177 Adapter.toTyped(system.scheduler()));
178 final RunningContext runningContext = ask.toCompletableFuture().get();
180 return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
181 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
184 protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
185 final List<String> seedNodes) {
186 final Map<String, Object> overrides = new HashMap<>();
187 overrides.put(PORT_PARAM, port);
188 overrides.put(ROLE_PARAM, roles);
189 if (!seedNodes.isEmpty()) {
190 overrides.put(SEED_NODES_PARAM, seedNodes);
193 final Config config = ConfigFactory.parseMap(overrides)
194 .withFallback(ConfigFactory.load());
196 // Create a classic Akka system since thats what we will have in osgi
197 return akka.actor.ActorSystem.create("ClusterSystem", config);
200 protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
201 final List<String> seedNodes, final String dataCenter) {
202 final Map<String, Object> overrides = new HashMap<>();
203 overrides.put(PORT_PARAM, port);
204 overrides.put(ROLE_PARAM, roles);
205 if (!seedNodes.isEmpty()) {
206 overrides.put(SEED_NODES_PARAM, seedNodes);
208 overrides.put(DATA_CENTER_PARAM, dataCenter);
210 final Config config = ConfigFactory.parseMap(overrides)
211 .withFallback(ConfigFactory.load());
213 // Create a classic Akka system since thats what we will have in osgi
214 return akka.actor.ActorSystem.create("ClusterSystem", config);
217 private static Behavior<BootstrapCommand> rootBehavior() {
218 return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
221 protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
222 final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
223 registerCandidates(candidateRegistry, entity, members);
226 protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
227 final DOMEntity entity, final String... members) {
228 for (final String member : members) {
229 candidateRegistry.tell(new RegisterCandidate(entity, member));
233 protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
234 final String... members) {
235 final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
236 for (final String member : members) {
237 candidateRegistry.tell(new UnregisterCandidate(entity, member));
241 protected static MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
242 final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
243 final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
244 listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
249 protected static void reachableMember(final ClusterNode node, final String... role) {
250 reachableMember(node.getOwnerSupervisor(), role);
253 protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
254 final String... role) {
255 ownerSupervisor.tell(new MemberReachableEvent(
256 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
259 protected static void unreachableMember(final ClusterNode node, final String... role) {
260 unreachableMember(node.getOwnerSupervisor(), role);
263 protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
264 final String... role) {
265 ownerSupervisor.tell(new MemberUnreachableEvent(
266 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
269 protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
270 await().atMost(Duration.ofSeconds(15)).until(() -> {
271 final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
272 final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
273 AskPattern.ask(distributedData.replicator(),
274 replyTo -> new Replicator.Get<>(
275 new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
276 Duration.ofSeconds(5),
277 clusterNode.getActorSystem().scheduler());
279 final Replicator.GetResponse<LWWRegister<String>> response =
280 ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
282 if (response instanceof Replicator.GetSuccess) {
283 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
284 return !owner.isEmpty();
291 protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity,
292 final String candidate) {
293 await().atMost(Duration.ofSeconds(15)).until(() -> {
294 final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
296 final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
297 AskPattern.ask(distributedData.replicator(),
298 replyTo -> new Replicator.Get<>(
299 CandidateRegistry.KEY, Replicator.readLocal(), replyTo),
300 Duration.ofSeconds(5),
301 clusterNode.getActorSystem().scheduler());
303 final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response =
304 ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
306 if (response instanceof Replicator.GetSuccess) {
307 final Map<DOMEntity, ORSet<String>> entries =
308 ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response).dataValue().getEntries();
310 return entries.get(entity).contains(candidate);
317 protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
318 final CompletionStage<OwnerSupervisorReply> ask =
319 AskPattern.ask(clusterNode.getOwnerSupervisor(),
320 ActivateDataCenter::new,
321 Duration.ofSeconds(20),
322 clusterNode.actorSystem.scheduler());
323 return ask.toCompletableFuture();
326 protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
327 final CompletionStage<OwnerSupervisorReply> ask =
328 AskPattern.ask(clusterNode.getOwnerSupervisor(),
329 DeactivateDataCenter::new,
330 Duration.ofSeconds(20),
331 clusterNode.actorSystem.scheduler());
332 return ask.toCompletableFuture();
335 protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
336 final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
337 await().until(() -> !listener.getChanges().isEmpty());
339 await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
340 final var changes = listener.getChanges();
341 final var domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
342 assertEquals(entity, domEntityOwnershipChange.entity());
344 assertEquals(hasOwner, domEntityOwnershipChange.change().hasOwner());
345 assertEquals(isOwner, domEntityOwnershipChange.change().isOwner());
346 assertEquals(wasOwner, domEntityOwnershipChange.change().wasOwner());
350 protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
351 verifyNoNotifications(listener, 2);
354 protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
355 await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
358 protected static void verifyNoAdditionalNotifications(
359 final MockEntityOwnershipListener listener, final long delaySeconds) {
360 listener.resetListener();
361 verifyNoNotifications(listener, delaySeconds);
364 protected static final class ClusterNode {
365 private final int port;
366 private final List<String> roles;
367 private final akka.actor.typed.ActorSystem<Void> actorSystem;
368 private final ActorRef<BootstrapCommand> eosBootstrap;
369 private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
370 private final ActorRef<CandidateRegistryCommand> candidateRegistry;
371 private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
373 private ClusterNode(final int port,
374 final List<String> roles,
375 final ActorSystem actorSystem,
376 final ActorRef<BootstrapCommand> eosBootstrap,
377 final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
378 final ActorRef<CandidateRegistryCommand> candidateRegistry,
379 final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
382 this.actorSystem = Adapter.toTyped(actorSystem);
383 this.eosBootstrap = eosBootstrap;
384 this.listenerRegistry = listenerRegistry;
385 this.candidateRegistry = candidateRegistry;
386 this.ownerSupervisor = ownerSupervisor;
389 public int getPort() {
393 public akka.actor.typed.ActorSystem<Void> getActorSystem() {
397 public ActorRef<BootstrapCommand> getEosBootstrap() {
401 public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
402 return listenerRegistry;
405 public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
406 return candidateRegistry;
409 public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
410 return ownerSupervisor;
413 public List<String> getRoles() {
418 protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
419 private final List<EntityOwnerChanged> changes = new ArrayList<>();
420 private final String member;
421 private final Logger log;
423 public MockEntityOwnershipListener(final String member) {
424 log = LoggerFactory.getLogger("EOS-listener-" + member);
425 this.member = member;
429 public void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
430 final boolean inJeopardy) {
431 final var changed = new EntityOwnerChanged(entity, change, inJeopardy);
432 log.info("{} Received ownershipCHanged: {}", member, changed);
433 log.info("{} changes: {}", member, changes.size());
434 changes.add(changed);
437 public List<EntityOwnerChanged> getChanges() {
441 public void resetListener() {
446 protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
447 private final ActorSystem classicActorSystem;
449 protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
450 throws ExecutionException, InterruptedException {
451 super(classicActorSystem, CODEC_CONTEXT);
452 this.classicActorSystem = classicActorSystem;
455 protected void reachableMember(final String... role) {
456 AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
459 public void unreachableMember(final String... role) {
460 AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
463 public ActorSystem getActorSystem() {
464 return classicActorSystem;