Handle owner changed message
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.Member;
18 import akka.cluster.ddata.LWWRegister;
19 import akka.cluster.ddata.LWWRegisterKey;
20 import akka.cluster.ddata.ORMap;
21 import akka.cluster.ddata.ORSet;
22 import akka.cluster.ddata.SelfUniqueAddress;
23 import akka.cluster.ddata.typed.javadsl.DistributedData;
24 import akka.cluster.ddata.typed.javadsl.Replicator;
25 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
26 import akka.cluster.typed.Cluster;
27 import akka.cluster.typed.Subscribe;
28 import com.google.common.collect.HashMultimap;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableSet;
31 import com.google.common.collect.Multimap;
32 import com.google.common.collect.Sets;
33 import java.time.Duration;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.function.BiPredicate;
42 import java.util.stream.Collectors;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
51 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
52 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
53 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.collection.JavaConverters;
57
58 /**
59  * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
60  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
61  * On cluster up/down etc. events the owners are reassigned if possible.
62  */
63 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
64
65     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
66     private static final String DATACENTER_PREFIX = "dc-";
67
68     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
69
70     // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
71     // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
72     // running in a cluster-singleton
73     private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
74
75     private final Cluster cluster;
76     private final SelfUniqueAddress node;
77     private final String dataCenter;
78
79     private final Set<String> activeMembers;
80
81     // currently registered candidates
82     private final Map<DOMEntity, Set<String>> currentCandidates;
83     // current owners
84     private final Map<DOMEntity, String> currentOwners;
85     // reverse lookup of owner to entity
86     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
87
88     // only reassign owner for those entities that lost this candidate or is not reachable
89     private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
90             !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
91
92     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
93                             final Map<DOMEntity, Set<String>> currentCandidates,
94                             final Map<DOMEntity, String> currentOwners) {
95         super(context);
96
97         final DistributedData distributedData = DistributedData.get(context.getSystem());
98         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
99
100         cluster = Cluster.get(context.getSystem());
101         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
102         dataCenter = extractDatacenterRole(cluster.selfMember());
103
104         node = distributedData.selfUniqueAddress();
105         activeMembers = getActiveMembers();
106
107         this.currentCandidates = currentCandidates;
108         this.currentOwners = currentOwners;
109
110         for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
111             ownerToEntity.put(entry.getValue(), entry.getKey());
112         }
113
114         // check whether we have any unreachable/missing owners
115         reassignUnreachableOwners();
116         assignMissingOwners();
117
118         final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
119                 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
120                     if (event instanceof ClusterEvent.MemberUp) {
121                         return new MemberUpEvent(event.member().address(), event.member().getRoles());
122                     } else {
123                         return new MemberDownEvent(event.member().address(), event.member().getRoles());
124                     }
125                 });
126         cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
127
128         final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
129                 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
130                     if (event instanceof ClusterEvent.ReachableMember) {
131                         return new MemberReachableEvent(event.member().address(), event.member().getRoles());
132                     } else {
133                         return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
134                     }
135                 });
136         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
137
138         new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
139             Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
140
141         LOG.debug("Owner Supervisor started");
142     }
143
144     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
145                                                           final Map<DOMEntity, String> currentOwners) {
146         return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
147     }
148
149     @Override
150     public Receive<OwnerSupervisorCommand> createReceive() {
151         return newReceiveBuilder()
152                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
153                 .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
154                 .onMessage(OwnerChanged.class, this::onOwnerChanged)
155                 .onMessage(MemberUpEvent.class, this::onPeerUp)
156                 .onMessage(MemberDownEvent.class, this::onPeerDown)
157                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
158                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
159                 .build();
160     }
161
162     private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
163         LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
164         command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
165         return IdleSupervisor.create();
166     }
167
168     private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
169         LOG.debug("Owner has changed for {}", command.getResponse().key());
170         return this;
171     }
172
173     private void reassignUnreachableOwners() {
174         final Set<String> ownersToReassign = new HashSet<>();
175         for (final String owner : ownerToEntity.keys()) {
176             if (!activeMembers.contains(owner)) {
177                 ownersToReassign.add(owner);
178             }
179         }
180
181         for (final String owner : ownersToReassign) {
182             reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
183         }
184     }
185
186     private void assignMissingOwners() {
187         for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
188             if (!currentOwners.containsKey(entry.getKey())) {
189                 assignOwnerFor(entry.getKey());
190             }
191         }
192     }
193
194     private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
195         LOG.debug("onCandidatesChanged {}", message.getResponse());
196         if (message.getResponse() instanceof Replicator.Changed) {
197             final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
198                     (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
199             processCandidateChanges(changed.get(CandidateRegistry.KEY));
200         }
201         return this;
202     }
203
204     private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
205         final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
206         for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
207             processCandidatesFor(entry.getKey(), entry.getValue());
208         }
209     }
210
211     private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
212         LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
213
214         final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
215         // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
216         // no owner
217         if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
218             LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
219             currentCandidates.put(entity, new HashSet<>(candidates));
220
221             LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
222             assignOwnerFor(entity);
223
224             return;
225         }
226
227         final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
228         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
229
230         LOG.debug("currently present candidates: {}", currentlyPresent);
231         LOG.debug("difference: {}", difference);
232
233         final List<String> ownersToReassign = new ArrayList<>();
234
235         // first add/remove candidates from entities
236         for (final String toCheck : difference) {
237             if (!currentlyPresent.contains(toCheck)) {
238                 // add new candidate
239                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
240                 currentCandidates.get(entity).add(toCheck);
241
242                 if (!currentOwners.containsKey(entity)) {
243                     // might as well assign right away when we don't have an owner
244                     assignOwnerFor(entity);
245                 }
246
247                 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
248                 continue;
249             }
250
251             if (!candidates.contains(toCheck)) {
252                 // remove candidate
253                 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
254                 currentCandidates.get(entity).remove(toCheck);
255                 if (ownerToEntity.containsKey(toCheck)) {
256                     ownersToReassign.add(toCheck);
257                 }
258             }
259         }
260
261         // then reassign those that need new owners
262         for (final String toReassign : ownersToReassign) {
263             reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
264                     reassignPredicate);
265         }
266
267         if (currentCandidates.get(entity) == null) {
268             LOG.debug("Last candidate removed for {}", entity);
269         } else {
270             LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
271         }
272     }
273
274     private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
275                                        final BiPredicate<DOMEntity, String> predicate) {
276         LOG.debug("Reassigning owners for {}", entities);
277         for (final DOMEntity entity : entities) {
278
279
280             if (predicate.test(entity, oldOwner)) {
281                 ownerToEntity.remove(oldOwner, entity);
282                 assignOwnerFor(entity);
283             }
284         }
285     }
286
287     private boolean isActiveCandidate(final String candidate) {
288         return activeMembers.contains(candidate);
289     }
290
291     private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
292         return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate);
293     }
294
295     private void assignOwnerFor(final DOMEntity entity) {
296         final Set<String> candidatesForEntity = currentCandidates.get(entity);
297         if (candidatesForEntity.isEmpty()) {
298             LOG.debug("No candidates present for entity: {}", entity);
299             removeOwner(entity);
300             return;
301         }
302
303         String pickedCandidate = null;
304         for (final String candidate : candidatesForEntity) {
305             if (activeMembers.contains(candidate)) {
306                 pickedCandidate = candidate;
307                 break;
308             }
309         }
310         if (pickedCandidate == null) {
311             LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
312                     entity, activeMembers, currentCandidates.get(entity));
313             // no candidate is reachable so only remove owner if necessary
314             removeOwner(entity);
315             return;
316         }
317         ownerToEntity.put(pickedCandidate, entity);
318
319         LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
320         currentOwners.put(entity, pickedCandidate);
321         writeNewOwner(entity, pickedCandidate);
322     }
323
324     private void removeOwner(final DOMEntity entity) {
325         if (currentOwners.containsKey(entity)) {
326             // assign empty owner to dd, as we cannot delete data for a key since that would prevent
327             // writes for the same key
328             currentOwners.remove(entity);
329
330             writeNewOwner(entity, "");
331         }
332     }
333
334     private void writeNewOwner(final DOMEntity entity, final String candidate) {
335         ownerReplicator.askUpdate(
336                 askReplyTo -> new Replicator.Update<>(
337                         new LWWRegisterKey<>(entity.toString()),
338                         new LWWRegister<>(node.uniqueAddress(), candidate, 0),
339                         Replicator.writeLocal(),
340                         askReplyTo,
341                         register -> register.withValue(node, candidate, clock)),
342                 OwnerChanged::new);
343     }
344
345     private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
346         LOG.debug("Received MemberUp : {}", event);
347
348         handleReachableEvent(event.getRoles());
349         return this;
350     }
351
352     private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
353         LOG.debug("Received MemberReachable : {}", event);
354
355         handleReachableEvent(event.getRoles());
356         return this;
357     }
358
359     private void handleReachableEvent(final Set<String> roles) {
360         if (roles.contains(dataCenter)) {
361             activeMembers.add(extractRole(roles));
362             assignMissingOwners();
363         } else {
364             LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
365         }
366     }
367
368     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
369         LOG.debug("Received MemberDown : {}", event);
370
371         handleUnreachableEvent(event.getRoles());
372         return this;
373     }
374
375     private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
376         LOG.debug("Received MemberUnreachable : {}", event);
377
378         handleUnreachableEvent(event.getRoles());
379         return this;
380     }
381
382     private void handleUnreachableEvent(final Set<String> roles) {
383         if (roles.contains(dataCenter)) {
384             activeMembers.remove(extractRole(roles));
385             reassignUnreachableOwners();
386         } else {
387             LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
388         }
389     }
390
391     private Set<String> getActiveMembers() {
392         final Set<String> members = new HashSet<>();
393         cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
394         // filter out unreachable
395         members.removeAll(cluster.state().getUnreachable().stream()
396                 .map(OwnerSupervisor::extractRole)
397                 .collect(Collectors.toSet()));
398
399         // filter out members not from our datacenter
400         cluster.state().getMembers().forEach(member -> {
401             if (!member.roles().contains(dataCenter)) {
402                 members.remove(extractRole(member));
403             }
404         });
405
406         return members;
407     }
408
409     private static String extractRole(final Member member) {
410         return extractRole(member.getRoles());
411     }
412
413     private static String extractRole(final Set<String> roles) {
414         return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
415                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
416     }
417
418     private String extractDatacenterRole(final Member member) {
419         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
420                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
421     }
422 }