8d10119c796acaa9db370af38f951e116c3f681b
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.typed.ActorRef;
16 import akka.actor.typed.Behavior;
17 import akka.actor.typed.javadsl.Adapter;
18 import akka.actor.typed.javadsl.AskPattern;
19 import akka.actor.typed.javadsl.Behaviors;
20 import akka.cluster.ddata.LWWRegister;
21 import akka.cluster.ddata.LWWRegisterKey;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.time.Duration;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.CompletionStage;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Supplier;
36 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
37 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
38 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
39 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
40 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
43 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
44 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
45 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
46 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
47 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
48 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
49 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
50 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 public abstract class AbstractNativeEosTest {
55
56     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
57     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
58
59     protected static final List<String> TWO_NODE_SEED_NODES =
60             List.of("akka://ClusterSystem@127.0.0.1:2550",
61                     "akka://ClusterSystem@127.0.0.1:2551");
62
63     protected static final List<String> THREE_NODE_SEED_NODES =
64             List.of("akka://ClusterSystem@127.0.0.1:2550",
65                     "akka://ClusterSystem@127.0.0.1:2551",
66                     "akka://ClusterSystem@127.0.0.1:2552");
67
68     private static final String REMOTE_PROTOCOL = "akka";
69     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
70     private static final String ROLE_PARAM = "akka.cluster.roles";
71     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
72
73
74     protected static ClusterNode startupRemote(final int port, final List<String> roles)
75             throws ExecutionException, InterruptedException {
76         return startup(port, roles, THREE_NODE_SEED_NODES);
77     }
78
79     protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
80             throws ExecutionException, InterruptedException {
81         return startup(port, roles, seedNodes);
82     }
83
84     protected static ClusterNode startup(final int port, final List<String> roles)
85             throws ExecutionException, InterruptedException {
86         return startup(port, roles, List.of());
87     }
88
89     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
90             throws ExecutionException, InterruptedException {
91
92         return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
93     }
94
95     protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
96                                          final Supplier<Behavior<BootstrapCommand>> bootstrap)
97             throws ExecutionException, InterruptedException {
98         // Override the configuration
99         final Map<String, Object> overrides = new HashMap<>(4);
100         overrides.put(PORT_PARAM, port);
101         overrides.put(ROLE_PARAM, roles);
102         if (!seedNodes.isEmpty()) {
103             overrides.put(SEED_NODES_PARAM, seedNodes);
104         }
105
106         final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
107
108         // Create a classic Akka system since thats what we will have in osgi
109         final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
110         final ActorRef<BootstrapCommand> eosBootstrap =
111                 Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
112
113         final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
114                 GetRunningContext::new,
115                 Duration.ofSeconds(5),
116                 Adapter.toTyped(system.scheduler()));
117         final RunningContext runningContext = ask.toCompletableFuture().get();
118
119         return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
120                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
121     }
122
123     private static Behavior<BootstrapCommand> rootBehavior() {
124         return Behaviors.setup(context -> EOSMain.create());
125     }
126
127     protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
128         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
129         registerCandidates(candidateRegistry, entity, members);
130     }
131
132     protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
133                                              final DOMEntity entity, final String... members) {
134         for (final String member : members) {
135             candidateRegistry.tell(new RegisterCandidate(entity, member));
136         }
137     }
138
139     protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
140                                                final String... members) {
141         final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
142         for (final String member : members) {
143             candidateRegistry.tell(new UnregisterCandidate(entity, member));
144         }
145     }
146
147     protected static  MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
148         final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
149         final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
150         listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
151
152         return listener;
153     }
154
155     protected static void reachableMember(final ClusterNode node, final String role) {
156         reachableMember(node.getOwnerSupervisor(), role);
157     }
158
159     protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
160         ownerSupervisor.tell(new MemberReachableEvent(
161                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
162     }
163
164     protected static void unreachableMember(final ClusterNode node, final String role) {
165         unreachableMember(node.getOwnerSupervisor(), role);
166     }
167
168     protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
169         ownerSupervisor.tell(new MemberUnreachableEvent(
170                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
171     }
172
173     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
174         await().until(() -> {
175             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
176             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
177                     AskPattern.ask(distributedData.replicator(),
178                             replyTo -> new Replicator.Get<>(
179                                     new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
180                             Duration.ofSeconds(5),
181                             clusterNode.getActorSystem().scheduler());
182
183             final Replicator.GetResponse<LWWRegister<String>> response =
184                     ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
185
186             if (response instanceof Replicator.GetSuccess) {
187                 final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
188                 return !owner.isEmpty();
189             }
190
191             return false;
192         });
193     }
194
195     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
196                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
197         await().until(() -> !listener.getChanges().isEmpty());
198
199         await().untilAsserted(() -> {
200             final List<DOMEntityOwnershipChange> changes = listener.getChanges();
201             final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
202             assertEquals(entity, domEntityOwnershipChange.getEntity());
203
204             assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
205             assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
206             assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
207         });
208     }
209
210     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
211         await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
212     }
213
214     protected static final class ClusterNode {
215         private final int port;
216         private final List<String> roles;
217         private final akka.actor.typed.ActorSystem<Void> actorSystem;
218         private final ActorRef<BootstrapCommand> eosBootstrap;
219         private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
220         private final ActorRef<CandidateRegistryCommand> candidateRegistry;
221         private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
222
223         private ClusterNode(final int port,
224                             final List<String> roles,
225                             final ActorSystem actorSystem,
226                             final ActorRef<BootstrapCommand> eosBootstrap,
227                             final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
228                             final ActorRef<CandidateRegistryCommand> candidateRegistry,
229                             final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
230             this.port = port;
231             this.roles = roles;
232             this.actorSystem = Adapter.toTyped(actorSystem);
233             this.eosBootstrap = eosBootstrap;
234             this.listenerRegistry = listenerRegistry;
235             this.candidateRegistry = candidateRegistry;
236             this.ownerSupervisor = ownerSupervisor;
237         }
238
239         public int getPort() {
240             return port;
241         }
242
243         public akka.actor.typed.ActorSystem<Void> getActorSystem() {
244             return actorSystem;
245         }
246
247         public ActorRef<BootstrapCommand> getEosBootstrap() {
248             return eosBootstrap;
249         }
250
251         public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
252             return listenerRegistry;
253         }
254
255         public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
256             return candidateRegistry;
257         }
258
259         public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
260             return ownerSupervisor;
261         }
262
263         public List<String> getRoles() {
264             return roles;
265         }
266     }
267
268     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
269
270         private final Logger log = LoggerFactory.getLogger(MockEntityOwnershipListener.class);
271
272         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
273         private final String member;
274
275         public MockEntityOwnershipListener(final String member) {
276
277             this.member = member;
278         }
279
280         @Override
281         public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
282             log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
283             log.info("{} changes: {}", member, changes.size());
284             changes.add(ownershipChange);
285         }
286
287         public List<DOMEntityOwnershipChange> getChanges() {
288             return changes;
289         }
290     }
291 }