2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka;
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.time.Duration;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.function.Supplier;
37 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
38 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
39 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
47 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
48 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
49 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
50 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
51 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
52 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
53 import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
54 import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
55 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
56 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
57 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
58 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 public abstract class AbstractNativeEosTest {
64 public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
65 public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
67 protected static final String DEFAULT_DATACENTER = "dc-default";
69 protected static final List<String> TWO_NODE_SEED_NODES =
70 List.of("akka://ClusterSystem@127.0.0.1:2550",
71 "akka://ClusterSystem@127.0.0.1:2551");
73 protected static final List<String> THREE_NODE_SEED_NODES =
74 List.of("akka://ClusterSystem@127.0.0.1:2550",
75 "akka://ClusterSystem@127.0.0.1:2551",
76 "akka://ClusterSystem@127.0.0.1:2552");
78 protected static final List<String> DATACENTER_SEED_NODES =
79 List.of("akka://ClusterSystem@127.0.0.1:2550",
80 "akka://ClusterSystem@127.0.0.1:2551",
81 "akka://ClusterSystem@127.0.0.1:2552",
82 "akka://ClusterSystem@127.0.0.1:2553");
84 private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
86 protected static BindingCodecContext CODEC_CONTEXT
87 = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
89 private static final String REMOTE_PROTOCOL = "akka";
90 private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
91 private static final String ROLE_PARAM = "akka.cluster.roles";
92 private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
93 private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
95 protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
96 final List<String> seedNodes)
97 throws ExecutionException, InterruptedException {
98 final Map<String, Object> overrides = new HashMap<>();
99 overrides.put(PORT_PARAM, port);
100 overrides.put(ROLE_PARAM, roles);
101 if (!seedNodes.isEmpty()) {
102 overrides.put(SEED_NODES_PARAM, seedNodes);
105 final Config config = ConfigFactory.parseMap(overrides)
106 .withFallback(ConfigFactory.load());
108 // Create a classic Akka system since thats what we will have in osgi
109 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
111 return new MockNativeEntityOwnershipService(system);
114 protected static ClusterNode startupRemote(final int port, final List<String> roles)
115 throws ExecutionException, InterruptedException {
116 return startup(port, roles, THREE_NODE_SEED_NODES);
119 protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
120 throws ExecutionException, InterruptedException {
121 return startup(port, roles, seedNodes);
124 protected static ClusterNode startup(final int port, final List<String> roles)
125 throws ExecutionException, InterruptedException {
126 return startup(port, roles, List.of());
129 protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
130 throws ExecutionException, InterruptedException {
132 return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
135 protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
136 final Supplier<Behavior<BootstrapCommand>> bootstrap)
137 throws ExecutionException, InterruptedException {
138 // Override the configuration
139 final Map<String, Object> overrides = new HashMap<>(4);
140 overrides.put(PORT_PARAM, port);
141 overrides.put(ROLE_PARAM, roles);
142 if (!seedNodes.isEmpty()) {
143 overrides.put(SEED_NODES_PARAM, seedNodes);
146 final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
148 // Create a classic Akka system since thats what we will have in osgi
149 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
150 final ActorRef<BootstrapCommand> eosBootstrap =
151 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
153 final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
154 GetRunningContext::new,
155 Duration.ofSeconds(5),
156 Adapter.toTyped(system.scheduler()));
157 final RunningContext runningContext = ask.toCompletableFuture().get();
159 return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
160 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
163 protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
164 final List<String> seedNodes, final String dataCenter)
165 throws ExecutionException, InterruptedException {
166 final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
167 final ActorRef<BootstrapCommand> eosBootstrap =
168 Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
170 final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
171 GetRunningContext::new,
172 Duration.ofSeconds(5),
173 Adapter.toTyped(system.scheduler()));
174 final RunningContext runningContext = ask.toCompletableFuture().get();
176 return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
177 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
180 protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
181 final List<String> seedNodes) {
182 final Map<String, Object> overrides = new HashMap<>();
183 overrides.put(PORT_PARAM, port);
184 overrides.put(ROLE_PARAM, roles);
185 if (!seedNodes.isEmpty()) {
186 overrides.put(SEED_NODES_PARAM, seedNodes);
189 final Config config = ConfigFactory.parseMap(overrides)
190 .withFallback(ConfigFactory.load());
192 // Create a classic Akka system since thats what we will have in osgi
193 return akka.actor.ActorSystem.create("ClusterSystem", config);
196 protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
197 final List<String> seedNodes, final String dataCenter) {
198 final Map<String, Object> overrides = new HashMap<>();
199 overrides.put(PORT_PARAM, port);
200 overrides.put(ROLE_PARAM, roles);
201 if (!seedNodes.isEmpty()) {
202 overrides.put(SEED_NODES_PARAM, seedNodes);
204 overrides.put(DATA_CENTER_PARAM, dataCenter);
206 final Config config = ConfigFactory.parseMap(overrides)
207 .withFallback(ConfigFactory.load());
209 // Create a classic Akka system since thats what we will have in osgi
210 return akka.actor.ActorSystem.create("ClusterSystem", config);
213 private static Behavior<BootstrapCommand> rootBehavior() {
214 return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
217 protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
218 final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
219 registerCandidates(candidateRegistry, entity, members);
222 protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
223 final DOMEntity entity, final String... members) {
224 for (final String member : members) {
225 candidateRegistry.tell(new RegisterCandidate(entity, member));
229 protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
230 final String... members) {
231 final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
232 for (final String member : members) {
233 candidateRegistry.tell(new UnregisterCandidate(entity, member));
237 protected static MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
238 final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
239 final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
240 listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
245 protected static void reachableMember(final ClusterNode node, final String... role) {
246 reachableMember(node.getOwnerSupervisor(), role);
249 protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
250 final String... role) {
251 ownerSupervisor.tell(new MemberReachableEvent(
252 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
255 protected static void unreachableMember(final ClusterNode node, final String... role) {
256 unreachableMember(node.getOwnerSupervisor(), role);
259 protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
260 final String... role) {
261 ownerSupervisor.tell(new MemberUnreachableEvent(
262 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
265 protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
266 await().until(() -> {
267 final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
268 final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
269 AskPattern.ask(distributedData.replicator(),
270 replyTo -> new Replicator.Get<>(
271 new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
272 Duration.ofSeconds(5),
273 clusterNode.getActorSystem().scheduler());
275 final Replicator.GetResponse<LWWRegister<String>> response =
276 ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
278 if (response instanceof Replicator.GetSuccess) {
279 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
280 return !owner.isEmpty();
287 protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
288 final CompletionStage<OwnerSupervisorReply> ask =
289 AskPattern.ask(clusterNode.getOwnerSupervisor(),
290 ActivateDataCenter::new,
291 Duration.ofSeconds(20),
292 clusterNode.actorSystem.scheduler());
293 return ask.toCompletableFuture();
296 protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
297 final CompletionStage<OwnerSupervisorReply> ask =
298 AskPattern.ask(clusterNode.getOwnerSupervisor(),
299 DeactivateDataCenter::new,
300 Duration.ofSeconds(20),
301 clusterNode.actorSystem.scheduler());
302 return ask.toCompletableFuture();
305 protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
306 final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
307 await().until(() -> !listener.getChanges().isEmpty());
309 await().untilAsserted(() -> {
310 final List<DOMEntityOwnershipChange> changes = listener.getChanges();
311 final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
312 assertEquals(entity, domEntityOwnershipChange.getEntity());
314 assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
315 assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
316 assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
320 protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
321 verifyNoNotifications(listener, 2);
324 protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
325 await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
328 protected static void verifyNoAdditionalNotifications(
329 final MockEntityOwnershipListener listener, final long delaySeconds) {
330 listener.resetListener();
331 verifyNoNotifications(listener, delaySeconds);
334 protected static final class ClusterNode {
335 private final int port;
336 private final List<String> roles;
337 private final akka.actor.typed.ActorSystem<Void> actorSystem;
338 private final ActorRef<BootstrapCommand> eosBootstrap;
339 private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
340 private final ActorRef<CandidateRegistryCommand> candidateRegistry;
341 private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
343 private ClusterNode(final int port,
344 final List<String> roles,
345 final ActorSystem actorSystem,
346 final ActorRef<BootstrapCommand> eosBootstrap,
347 final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
348 final ActorRef<CandidateRegistryCommand> candidateRegistry,
349 final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
352 this.actorSystem = Adapter.toTyped(actorSystem);
353 this.eosBootstrap = eosBootstrap;
354 this.listenerRegistry = listenerRegistry;
355 this.candidateRegistry = candidateRegistry;
356 this.ownerSupervisor = ownerSupervisor;
359 public int getPort() {
363 public akka.actor.typed.ActorSystem<Void> getActorSystem() {
367 public ActorRef<BootstrapCommand> getEosBootstrap() {
371 public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
372 return listenerRegistry;
375 public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
376 return candidateRegistry;
379 public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
380 return ownerSupervisor;
383 public List<String> getRoles() {
388 protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
390 private final Logger log;
392 private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
393 private final String member;
395 public MockEntityOwnershipListener(final String member) {
396 log = LoggerFactory.getLogger("EOS-listener-" + member);
397 this.member = member;
401 public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
402 log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
403 log.info("{} changes: {}", member, changes.size());
404 changes.add(ownershipChange);
407 public List<DOMEntityOwnershipChange> getChanges() {
411 public void resetListener() {
416 protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
417 private final ActorSystem classicActorSystem;
419 protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
420 throws ExecutionException, InterruptedException {
421 super(classicActorSystem, CODEC_CONTEXT);
422 this.classicActorSystem = classicActorSystem;
425 protected void reachableMember(final String... role) {
426 AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
429 public void unreachableMember(final String... role) {
430 AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
433 public ActorSystem getActorSystem() {
434 return classicActorSystem;