91b06342b945086f66b9436e5f54a9f7410d1a17
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.Member;
18 import akka.cluster.ddata.LWWRegister;
19 import akka.cluster.ddata.LWWRegisterKey;
20 import akka.cluster.ddata.ORMap;
21 import akka.cluster.ddata.ORSet;
22 import akka.cluster.ddata.SelfUniqueAddress;
23 import akka.cluster.ddata.typed.javadsl.DistributedData;
24 import akka.cluster.ddata.typed.javadsl.Replicator;
25 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
26 import akka.cluster.typed.Cluster;
27 import akka.cluster.typed.Subscribe;
28 import com.google.common.collect.HashMultimap;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableSet;
31 import com.google.common.collect.Multimap;
32 import com.google.common.collect.Sets;
33 import java.time.Duration;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.stream.Collectors;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
51 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
52 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import scala.collection.JavaConverters;
56
57 /**
58  * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
59  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
60  * On cluster up/down etc. events the owners are reassigned if possible.
61  */
62 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
63
64     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
65     private static final String DATACENTER_PREFIX = "dc-";
66
67     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
68
69     // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
70     // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
71     // running in a cluster-singleton
72     private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
73
74     private final Cluster cluster;
75     private final SelfUniqueAddress node;
76     private final String dataCenter;
77
78     private final Set<String> activeMembers;
79
80     // currently registered candidates
81     private final Map<DOMEntity, Set<String>> currentCandidates;
82     // current owners
83     private final Map<DOMEntity, String> currentOwners;
84     // reverse lookup of owner to entity
85     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
86
87     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
88                             final Map<DOMEntity, Set<String>> currentCandidates,
89                             final Map<DOMEntity, String> currentOwners) {
90         super(context);
91
92         final DistributedData distributedData = DistributedData.get(context.getSystem());
93         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
94
95         cluster = Cluster.get(context.getSystem());
96         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
97         dataCenter = extractDatacenterRole(cluster.selfMember());
98
99         node = distributedData.selfUniqueAddress();
100         activeMembers = getActiveMembers();
101
102         this.currentCandidates = currentCandidates;
103         this.currentOwners = currentOwners;
104
105         for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
106             ownerToEntity.put(entry.getValue(), entry.getKey());
107         }
108
109         // check whether we have any unreachable/missing owners
110         reassignUnreachableOwners();
111         assignMissingOwners();
112
113         final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
114                 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
115                     if (event instanceof ClusterEvent.MemberUp) {
116                         return new MemberUpEvent(event.member().address(), event.member().getRoles());
117                     } else {
118                         return new MemberDownEvent(event.member().address(), event.member().getRoles());
119                     }
120                 });
121         cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
122
123         final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
124                 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
125                     if (event instanceof ClusterEvent.ReachableMember) {
126                         return new MemberReachableEvent(event.member().address(), event.member().getRoles());
127                     } else {
128                         return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
129                     }
130                 });
131         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
132
133         new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
134             Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
135
136         LOG.debug("Owner Supervisor started");
137     }
138
139     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
140                                                           final Map<DOMEntity, String> currentOwners) {
141         return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
142     }
143
144     @Override
145     public Receive<OwnerSupervisorCommand> createReceive() {
146         return newReceiveBuilder()
147                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
148                 .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
149                 .onMessage(MemberUpEvent.class, this::onPeerUp)
150                 .onMessage(MemberDownEvent.class, this::onPeerDown)
151                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
152                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
153                 .build();
154     }
155
156     private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
157         LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
158         command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
159         return IdleSupervisor.create();
160     }
161
162     private void reassignUnreachableOwners() {
163         final Set<String> ownersToReassign = new HashSet<>();
164         for (final String owner : ownerToEntity.keys()) {
165             if (!activeMembers.contains(owner)) {
166                 ownersToReassign.add(owner);
167             }
168         }
169
170         for (final String owner : ownersToReassign) {
171             reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)));
172         }
173     }
174
175     private void assignMissingOwners() {
176         for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
177             if (!currentOwners.containsKey(entry.getKey())) {
178                 assignOwnerFor(entry.getKey());
179             }
180         }
181     }
182
183     private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
184         LOG.debug("onCandidatesChanged {}", message.getResponse());
185         if (message.getResponse() instanceof Replicator.Changed) {
186             final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
187                     (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
188             processCandidateChanges(changed.get(CandidateRegistry.KEY));
189         }
190         return this;
191     }
192
193     private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
194         final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
195         for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
196             processCandidatesFor(entry.getKey(), entry.getValue());
197         }
198     }
199
200     private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
201         LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
202
203         final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
204         // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
205         // no owner
206         if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
207             LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
208             currentCandidates.put(entity, new HashSet<>(candidates));
209
210             LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
211             assignOwnerFor(entity);
212
213             return;
214         }
215
216         final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
217         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
218
219         LOG.debug("currently present candidates: {}", currentlyPresent);
220         LOG.debug("difference: {}", difference);
221
222         final List<String> ownersToReassign = new ArrayList<>();
223
224         // first add/remove candidates from entities
225         for (final String toCheck : difference) {
226             if (!currentlyPresent.contains(toCheck)) {
227                 // add new candidate
228                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
229                 currentCandidates.get(entity).add(toCheck);
230
231                 if (!currentOwners.containsKey(entity)) {
232                     // might as well assign right away when we don't have an owner
233                     assignOwnerFor(entity);
234                 }
235
236                 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
237                 continue;
238             }
239
240             if (!candidates.contains(toCheck)) {
241                 // remove candidate
242                 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
243                 currentCandidates.get(entity).remove(toCheck);
244                 if (ownerToEntity.containsKey(toCheck)) {
245                     ownersToReassign.add(toCheck);
246                 }
247             }
248         }
249
250         // then reassign those that need new owners
251         for (final String toReassign : ownersToReassign) {
252             reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)));
253         }
254
255         if (currentCandidates.get(entity) == null) {
256             LOG.debug("Last candidate removed for {}", entity);
257         } else {
258             LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
259         }
260     }
261
262     private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities) {
263         LOG.debug("Reassigning owners for {}", entities);
264         for (final DOMEntity entity : entities) {
265
266             // only reassign owner for those entities that lost this candidate or is not reachable
267             if (!activeMembers.contains(oldOwner)
268                     || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) {
269                 ownerToEntity.remove(oldOwner, entity);
270                 assignOwnerFor(entity);
271             }
272         }
273     }
274
275     private void assignOwnerFor(final DOMEntity entity) {
276         final Set<String> candidatesForEntity = currentCandidates.get(entity);
277         if (candidatesForEntity.isEmpty()) {
278             LOG.debug("No candidates present for entity: {}", entity);
279             removeOwner(entity);
280             return;
281         }
282
283         String pickedCandidate = null;
284         for (final String candidate : candidatesForEntity) {
285             if (activeMembers.contains(candidate)) {
286                 pickedCandidate = candidate;
287                 break;
288             }
289         }
290         if (pickedCandidate == null) {
291             LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
292                     entity, activeMembers, currentCandidates.get(entity));
293             // no candidate is reachable so only remove owner if necessary
294             removeOwner(entity);
295             return;
296         }
297         ownerToEntity.put(pickedCandidate, entity);
298
299         LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
300         currentOwners.put(entity, pickedCandidate);
301         writeNewOwner(entity, pickedCandidate);
302     }
303
304     private void removeOwner(final DOMEntity entity) {
305         if (currentOwners.containsKey(entity)) {
306             // assign empty owner to dd, as we cannot delete data for a key since that would prevent
307             // writes for the same key
308             currentOwners.remove(entity);
309
310             writeNewOwner(entity, "");
311         }
312     }
313
314     private void writeNewOwner(final DOMEntity entity, final String candidate) {
315         ownerReplicator.askUpdate(
316                 askReplyTo -> new Replicator.Update<>(
317                         new LWWRegisterKey<>(entity.toString()),
318                         new LWWRegister<>(node.uniqueAddress(), candidate, 0),
319                         Replicator.writeLocal(),
320                         askReplyTo,
321                         register -> register.withValue(node, candidate, clock)),
322                 OwnerChanged::new);
323     }
324
325     private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
326         LOG.debug("Received MemberUp : {}", event);
327
328         handleReachableEvent(event.getRoles());
329         return this;
330     }
331
332     private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
333         LOG.debug("Received MemberReachable : {}", event);
334
335         handleReachableEvent(event.getRoles());
336         return this;
337     }
338
339     private void handleReachableEvent(final Set<String> roles) {
340         if (roles.contains(dataCenter)) {
341             activeMembers.add(extractRole(roles));
342             assignMissingOwners();
343         } else {
344             LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
345         }
346     }
347
348     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
349         LOG.debug("Received MemberDown : {}", event);
350
351         handleUnreachableEvent(event.getRoles());
352         return this;
353     }
354
355     private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
356         LOG.debug("Received MemberUnreachable : {}", event);
357
358         handleUnreachableEvent(event.getRoles());
359         return this;
360     }
361
362     private void handleUnreachableEvent(final Set<String> roles) {
363         if (roles.contains(dataCenter)) {
364             activeMembers.remove(extractRole(roles));
365             reassignUnreachableOwners();
366         } else {
367             LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
368         }
369     }
370
371     private Set<String> getActiveMembers() {
372         final Set<String> members = new HashSet<>();
373         cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
374         // filter out unreachable
375         members.removeAll(cluster.state().getUnreachable().stream()
376                 .map(OwnerSupervisor::extractRole)
377                 .collect(Collectors.toSet()));
378
379         // filter out members not from our datacenter
380         cluster.state().getMembers().forEach(member -> {
381             if (!member.roles().contains(dataCenter)) {
382                 members.remove(extractRole(member));
383             }
384         });
385
386         return members;
387     }
388
389     private static String extractRole(final Member member) {
390         return extractRole(member.getRoles());
391     }
392
393     private static String extractRole(final Set<String> roles) {
394         return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
395                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
396     }
397
398     private String extractDatacenterRole(final Member member) {
399         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
400                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
401     }
402 }