2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka;
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.time.Duration;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.function.Supplier;
37 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
38 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
39 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
40 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
47 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
48 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
49 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
50 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
51 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
52 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
53 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
54 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 public abstract class AbstractNativeEosTest {
60 public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
61 public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
63 protected static final String DEFAULT_DATACENTER = "dc-default";
65 protected static final List<String> TWO_NODE_SEED_NODES =
66 List.of("akka://ClusterSystem@127.0.0.1:2550",
67 "akka://ClusterSystem@127.0.0.1:2551");
69 protected static final List<String> THREE_NODE_SEED_NODES =
70 List.of("akka://ClusterSystem@127.0.0.1:2550",
71 "akka://ClusterSystem@127.0.0.1:2551",
72 "akka://ClusterSystem@127.0.0.1:2552");
74 protected static final List<String> DATACENTER_SEED_NODES =
75 List.of("akka://ClusterSystem@127.0.0.1:2550",
76 "akka://ClusterSystem@127.0.0.1:2551",
77 "akka://ClusterSystem@127.0.0.1:2552",
78 "akka://ClusterSystem@127.0.0.1:2553");
80 private static final String REMOTE_PROTOCOL = "akka";
81 private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
82 private static final String ROLE_PARAM = "akka.cluster.roles";
83 private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
84 private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
86 protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
87 final List<String> seedNodes)
88 throws ExecutionException, InterruptedException {
89 final Map<String, Object> overrides = new HashMap<>();
90 overrides.put(PORT_PARAM, port);
91 overrides.put(ROLE_PARAM, roles);
92 if (!seedNodes.isEmpty()) {
93 overrides.put(SEED_NODES_PARAM, seedNodes);
96 final Config config = ConfigFactory.parseMap(overrides)
97 .withFallback(ConfigFactory.load());
99 // Create a classic Akka system since thats what we will have in osgi
100 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
102 return new MockNativeEntityOwnershipService(system);
105 protected static ClusterNode startupRemote(final int port, final List<String> roles)
106 throws ExecutionException, InterruptedException {
107 return startup(port, roles, THREE_NODE_SEED_NODES);
110 protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
111 throws ExecutionException, InterruptedException {
112 return startup(port, roles, seedNodes);
115 protected static ClusterNode startup(final int port, final List<String> roles)
116 throws ExecutionException, InterruptedException {
117 return startup(port, roles, List.of());
120 protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
121 throws ExecutionException, InterruptedException {
123 return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
126 protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
127 final Supplier<Behavior<BootstrapCommand>> bootstrap)
128 throws ExecutionException, InterruptedException {
129 // Override the configuration
130 final Map<String, Object> overrides = new HashMap<>(4);
131 overrides.put(PORT_PARAM, port);
132 overrides.put(ROLE_PARAM, roles);
133 if (!seedNodes.isEmpty()) {
134 overrides.put(SEED_NODES_PARAM, seedNodes);
137 final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
139 // Create a classic Akka system since thats what we will have in osgi
140 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
141 final ActorRef<BootstrapCommand> eosBootstrap =
142 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
144 final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
145 GetRunningContext::new,
146 Duration.ofSeconds(5),
147 Adapter.toTyped(system.scheduler()));
148 final RunningContext runningContext = ask.toCompletableFuture().get();
150 return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
151 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
154 protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
155 final List<String> seedNodes, final String dataCenter)
156 throws ExecutionException, InterruptedException {
157 final Map<String, Object> overrides = new HashMap<>();
158 overrides.put(PORT_PARAM, port);
159 overrides.put(ROLE_PARAM, roles);
160 if (!seedNodes.isEmpty()) {
161 overrides.put(SEED_NODES_PARAM, seedNodes);
163 overrides.put(DATA_CENTER_PARAM, dataCenter);
165 final Config config = ConfigFactory.parseMap(overrides)
166 .withFallback(ConfigFactory.load());
168 // Create a classic Akka system since thats what we will have in osgi
169 final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
170 final ActorRef<BootstrapCommand> eosBootstrap =
171 Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
173 final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
174 GetRunningContext::new,
175 Duration.ofSeconds(5),
176 Adapter.toTyped(system.scheduler()));
177 final RunningContext runningContext = ask.toCompletableFuture().get();
179 return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
180 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
183 private static Behavior<BootstrapCommand> rootBehavior() {
184 return Behaviors.setup(context -> EOSMain.create());
187 protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
188 final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
189 registerCandidates(candidateRegistry, entity, members);
192 protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
193 final DOMEntity entity, final String... members) {
194 for (final String member : members) {
195 candidateRegistry.tell(new RegisterCandidate(entity, member));
199 protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
200 final String... members) {
201 final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
202 for (final String member : members) {
203 candidateRegistry.tell(new UnregisterCandidate(entity, member));
207 protected static MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
208 final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
209 final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
210 listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
215 protected static void reachableMember(final ClusterNode node, final String... role) {
216 reachableMember(node.getOwnerSupervisor(), role);
219 protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
220 final String... role) {
221 ownerSupervisor.tell(new MemberReachableEvent(
222 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
225 protected static void unreachableMember(final ClusterNode node, final String... role) {
226 unreachableMember(node.getOwnerSupervisor(), role);
229 protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
230 final String... role) {
231 ownerSupervisor.tell(new MemberUnreachableEvent(
232 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
235 protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
236 await().until(() -> {
237 final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
238 final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
239 AskPattern.ask(distributedData.replicator(),
240 replyTo -> new Replicator.Get<>(
241 new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
242 Duration.ofSeconds(5),
243 clusterNode.getActorSystem().scheduler());
245 final Replicator.GetResponse<LWWRegister<String>> response =
246 ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
248 if (response instanceof Replicator.GetSuccess) {
249 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
250 return !owner.isEmpty();
257 protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
258 final CompletionStage<OwnerSupervisorReply> ask =
259 AskPattern.ask(clusterNode.getOwnerSupervisor(),
260 ActivateDataCenter::new,
261 Duration.ofSeconds(20),
262 clusterNode.actorSystem.scheduler());
263 return ask.toCompletableFuture();
266 protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
267 final CompletionStage<OwnerSupervisorReply> ask =
268 AskPattern.ask(clusterNode.getOwnerSupervisor(),
269 DeactivateDataCenter::new,
270 Duration.ofSeconds(20),
271 clusterNode.actorSystem.scheduler());
272 return ask.toCompletableFuture();
275 protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
276 final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
277 await().until(() -> !listener.getChanges().isEmpty());
279 await().untilAsserted(() -> {
280 final List<DOMEntityOwnershipChange> changes = listener.getChanges();
281 final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
282 assertEquals(entity, domEntityOwnershipChange.getEntity());
284 assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
285 assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
286 assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
290 protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
291 verifyNoNotifications(listener, 2);
294 protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) {
295 await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
298 protected static void verifyNoAdditionalNotifications(
299 final MockEntityOwnershipListener listener, long delaySeconds) {
300 listener.resetListener();
301 verifyNoNotifications(listener, delaySeconds);
304 protected static final class ClusterNode {
305 private final int port;
306 private final List<String> roles;
307 private final akka.actor.typed.ActorSystem<Void> actorSystem;
308 private final ActorRef<BootstrapCommand> eosBootstrap;
309 private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
310 private final ActorRef<CandidateRegistryCommand> candidateRegistry;
311 private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
313 private ClusterNode(final int port,
314 final List<String> roles,
315 final ActorSystem actorSystem,
316 final ActorRef<BootstrapCommand> eosBootstrap,
317 final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
318 final ActorRef<CandidateRegistryCommand> candidateRegistry,
319 final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
322 this.actorSystem = Adapter.toTyped(actorSystem);
323 this.eosBootstrap = eosBootstrap;
324 this.listenerRegistry = listenerRegistry;
325 this.candidateRegistry = candidateRegistry;
326 this.ownerSupervisor = ownerSupervisor;
329 public int getPort() {
333 public akka.actor.typed.ActorSystem<Void> getActorSystem() {
337 public ActorRef<BootstrapCommand> getEosBootstrap() {
341 public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
342 return listenerRegistry;
345 public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
346 return candidateRegistry;
349 public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
350 return ownerSupervisor;
353 public List<String> getRoles() {
358 protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
360 private final Logger log;
362 private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
363 private final String member;
365 public MockEntityOwnershipListener(final String member) {
366 log = LoggerFactory.getLogger("EOS-listener-" + member);
367 this.member = member;
371 public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
372 log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
373 log.info("{} changes: {}", member, changes.size());
374 changes.add(ownershipChange);
377 public List<DOMEntityOwnershipChange> getChanges() {
381 public void resetListener() {
386 protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
387 private ActorSystem classicActorSystem;
389 protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
390 throws ExecutionException, InterruptedException {
391 super(classicActorSystem);
392 this.classicActorSystem = classicActorSystem;
395 protected void reachableMember(final String... role) {
396 AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
399 public void unreachableMember(final String... role) {
400 AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
403 public ActorSystem getActorSystem() {
404 return classicActorSystem;