2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka;
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.time.Duration;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.function.Supplier;
37 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
38 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
39 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
47 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
48 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
49 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
50 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
51 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
52 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
53 import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
54 import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
55 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
56 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
57 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
58 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 public abstract class AbstractNativeEosTest {
64 public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
65 public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
67 protected static final String DEFAULT_DATACENTER = "dc-default";
69 protected static final List<String> TWO_NODE_SEED_NODES =
70 List.of("akka://ClusterSystem@127.0.0.1:2550",
71 "akka://ClusterSystem@127.0.0.1:2551");
73 protected static final List<String> THREE_NODE_SEED_NODES =
74 List.of("akka://ClusterSystem@127.0.0.1:2550",
75 "akka://ClusterSystem@127.0.0.1:2551",
76 "akka://ClusterSystem@127.0.0.1:2552");
78 protected static final List<String> DATACENTER_SEED_NODES =
79 List.of("akka://ClusterSystem@127.0.0.1:2550",
80 "akka://ClusterSystem@127.0.0.1:2551",
81 "akka://ClusterSystem@127.0.0.1:2552",
82 "akka://ClusterSystem@127.0.0.1:2553");
84 private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
86 protected static BindingCodecContext CODEC_CONTEXT
87 = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
89 private static final String REMOTE_PROTOCOL = "akka";
90 private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
91 private static final String ROLE_PARAM = "akka.cluster.roles";
92 private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
93 private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
95 protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
96 final List<String> seedNodes)
97 throws ExecutionException, InterruptedException {
98 final Map<String, Object> overrides = new HashMap<>();
99 overrides.put(PORT_PARAM, port);
100 overrides.put(ROLE_PARAM, roles);
101 if (!seedNodes.isEmpty()) {
102 overrides.put(SEED_NODES_PARAM, seedNodes);
105 final Config config = ConfigFactory.parseMap(overrides)
106 .withFallback(ConfigFactory.load());
108 // Create a classic Akka system since thats what we will have in osgi
109 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
111 return new MockNativeEntityOwnershipService(system);
114 protected static ClusterNode startupRemote(final int port, final List<String> roles)
115 throws ExecutionException, InterruptedException {
116 return startup(port, roles, THREE_NODE_SEED_NODES);
119 protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
120 throws ExecutionException, InterruptedException {
121 return startup(port, roles, seedNodes);
124 protected static ClusterNode startup(final int port, final List<String> roles)
125 throws ExecutionException, InterruptedException {
126 return startup(port, roles, List.of());
129 protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
130 throws ExecutionException, InterruptedException {
132 return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
135 protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
136 final Supplier<Behavior<BootstrapCommand>> bootstrap)
137 throws ExecutionException, InterruptedException {
138 // Override the configuration
139 final Map<String, Object> overrides = new HashMap<>(4);
140 overrides.put(PORT_PARAM, port);
141 overrides.put(ROLE_PARAM, roles);
142 if (!seedNodes.isEmpty()) {
143 overrides.put(SEED_NODES_PARAM, seedNodes);
146 final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
148 // Create a classic Akka system since thats what we will have in osgi
149 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
150 final ActorRef<BootstrapCommand> eosBootstrap =
151 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
153 final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
154 GetRunningContext::new,
155 Duration.ofSeconds(5),
156 Adapter.toTyped(system.scheduler()));
157 final RunningContext runningContext = ask.toCompletableFuture().get();
159 return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
160 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
163 protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
164 final List<String> seedNodes, final String dataCenter)
165 throws ExecutionException, InterruptedException {
166 final Map<String, Object> overrides = new HashMap<>();
167 overrides.put(PORT_PARAM, port);
168 overrides.put(ROLE_PARAM, roles);
169 if (!seedNodes.isEmpty()) {
170 overrides.put(SEED_NODES_PARAM, seedNodes);
172 overrides.put(DATA_CENTER_PARAM, dataCenter);
174 final Config config = ConfigFactory.parseMap(overrides)
175 .withFallback(ConfigFactory.load());
177 // Create a classic Akka system since thats what we will have in osgi
178 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
179 final ActorRef<BootstrapCommand> eosBootstrap =
180 Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
182 final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
183 GetRunningContext::new,
184 Duration.ofSeconds(5),
185 Adapter.toTyped(system.scheduler()));
186 final RunningContext runningContext = ask.toCompletableFuture().get();
188 return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
189 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
192 private static Behavior<BootstrapCommand> rootBehavior() {
193 return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
196 protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
197 final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
198 registerCandidates(candidateRegistry, entity, members);
201 protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
202 final DOMEntity entity, final String... members) {
203 for (final String member : members) {
204 candidateRegistry.tell(new RegisterCandidate(entity, member));
208 protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
209 final String... members) {
210 final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
211 for (final String member : members) {
212 candidateRegistry.tell(new UnregisterCandidate(entity, member));
216 protected static MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
217 final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
218 final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
219 listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
224 protected static void reachableMember(final ClusterNode node, final String... role) {
225 reachableMember(node.getOwnerSupervisor(), role);
228 protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
229 final String... role) {
230 ownerSupervisor.tell(new MemberReachableEvent(
231 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
234 protected static void unreachableMember(final ClusterNode node, final String... role) {
235 unreachableMember(node.getOwnerSupervisor(), role);
238 protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
239 final String... role) {
240 ownerSupervisor.tell(new MemberUnreachableEvent(
241 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
244 protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
245 await().until(() -> {
246 final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
247 final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
248 AskPattern.ask(distributedData.replicator(),
249 replyTo -> new Replicator.Get<>(
250 new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
251 Duration.ofSeconds(5),
252 clusterNode.getActorSystem().scheduler());
254 final Replicator.GetResponse<LWWRegister<String>> response =
255 ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
257 if (response instanceof Replicator.GetSuccess) {
258 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
259 return !owner.isEmpty();
266 protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
267 final CompletionStage<OwnerSupervisorReply> ask =
268 AskPattern.ask(clusterNode.getOwnerSupervisor(),
269 ActivateDataCenter::new,
270 Duration.ofSeconds(20),
271 clusterNode.actorSystem.scheduler());
272 return ask.toCompletableFuture();
275 protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
276 final CompletionStage<OwnerSupervisorReply> ask =
277 AskPattern.ask(clusterNode.getOwnerSupervisor(),
278 DeactivateDataCenter::new,
279 Duration.ofSeconds(20),
280 clusterNode.actorSystem.scheduler());
281 return ask.toCompletableFuture();
284 protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
285 final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
286 await().until(() -> !listener.getChanges().isEmpty());
288 await().untilAsserted(() -> {
289 final List<DOMEntityOwnershipChange> changes = listener.getChanges();
290 final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
291 assertEquals(entity, domEntityOwnershipChange.getEntity());
293 assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
294 assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
295 assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
299 protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
300 verifyNoNotifications(listener, 2);
303 protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) {
304 await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
307 protected static void verifyNoAdditionalNotifications(
308 final MockEntityOwnershipListener listener, long delaySeconds) {
309 listener.resetListener();
310 verifyNoNotifications(listener, delaySeconds);
313 protected static final class ClusterNode {
314 private final int port;
315 private final List<String> roles;
316 private final akka.actor.typed.ActorSystem<Void> actorSystem;
317 private final ActorRef<BootstrapCommand> eosBootstrap;
318 private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
319 private final ActorRef<CandidateRegistryCommand> candidateRegistry;
320 private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
322 private ClusterNode(final int port,
323 final List<String> roles,
324 final ActorSystem actorSystem,
325 final ActorRef<BootstrapCommand> eosBootstrap,
326 final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
327 final ActorRef<CandidateRegistryCommand> candidateRegistry,
328 final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
331 this.actorSystem = Adapter.toTyped(actorSystem);
332 this.eosBootstrap = eosBootstrap;
333 this.listenerRegistry = listenerRegistry;
334 this.candidateRegistry = candidateRegistry;
335 this.ownerSupervisor = ownerSupervisor;
338 public int getPort() {
342 public akka.actor.typed.ActorSystem<Void> getActorSystem() {
346 public ActorRef<BootstrapCommand> getEosBootstrap() {
350 public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
351 return listenerRegistry;
354 public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
355 return candidateRegistry;
358 public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
359 return ownerSupervisor;
362 public List<String> getRoles() {
367 protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
369 private final Logger log;
371 private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
372 private final String member;
374 public MockEntityOwnershipListener(final String member) {
375 log = LoggerFactory.getLogger("EOS-listener-" + member);
376 this.member = member;
380 public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
381 log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
382 log.info("{} changes: {}", member, changes.size());
383 changes.add(ownershipChange);
386 public List<DOMEntityOwnershipChange> getChanges() {
390 public void resetListener() {
395 protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
396 private ActorSystem classicActorSystem;
398 protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
399 throws ExecutionException, InterruptedException {
400 super(classicActorSystem, CODEC_CONTEXT);
401 this.classicActorSystem = classicActorSystem;
404 protected void reachableMember(final String... role) {
405 AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
408 public void unreachableMember(final String... role) {
409 AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
412 public ActorSystem getActorSystem() {
413 return classicActorSystem;