Expose entity details in MDSAL
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.Member;
18 import akka.cluster.ddata.LWWRegister;
19 import akka.cluster.ddata.LWWRegisterKey;
20 import akka.cluster.ddata.ORMap;
21 import akka.cluster.ddata.ORSet;
22 import akka.cluster.ddata.SelfUniqueAddress;
23 import akka.cluster.ddata.typed.javadsl.DistributedData;
24 import akka.cluster.ddata.typed.javadsl.Replicator;
25 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
26 import akka.cluster.typed.Cluster;
27 import akka.cluster.typed.Subscribe;
28 import com.google.common.collect.HashMultimap;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableSet;
31 import com.google.common.collect.Multimap;
32 import com.google.common.collect.Sets;
33 import java.time.Duration;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.function.BiPredicate;
42 import java.util.stream.Collectors;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
51 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
52 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
53 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
54 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
55 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
56 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
57 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
58 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
59 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
60 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63 import scala.collection.JavaConverters;
64
65 /**
66  * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
67  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
68  * On cluster up/down etc. events the owners are reassigned if possible.
69  */
70 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
71
72     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
73     private static final String DATACENTER_PREFIX = "dc-";
74
75     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
76
77     // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
78     // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
79     // running in a cluster-singleton
80     private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
81
82     private final Cluster cluster;
83     private final SelfUniqueAddress node;
84     private final String dataCenter;
85
86     private final Set<String> activeMembers;
87
88     // currently registered candidates
89     private final Map<DOMEntity, Set<String>> currentCandidates;
90     // current owners
91     private final Map<DOMEntity, String> currentOwners;
92     // reverse lookup of owner to entity
93     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
94
95     // only reassign owner for those entities that lost this candidate or is not reachable
96     private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
97             !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
98
99     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
100                             final Map<DOMEntity, Set<String>> currentCandidates,
101                             final Map<DOMEntity, String> currentOwners) {
102         super(context);
103
104         final DistributedData distributedData = DistributedData.get(context.getSystem());
105         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
106
107         cluster = Cluster.get(context.getSystem());
108         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
109         dataCenter = extractDatacenterRole(cluster.selfMember());
110
111         node = distributedData.selfUniqueAddress();
112         activeMembers = getActiveMembers();
113
114         this.currentCandidates = currentCandidates;
115         this.currentOwners = currentOwners;
116
117         for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
118             ownerToEntity.put(entry.getValue(), entry.getKey());
119         }
120
121         // check whether we have any unreachable/missing owners
122         reassignUnreachableOwners();
123         assignMissingOwners();
124
125         final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
126                 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
127                     if (event instanceof ClusterEvent.MemberUp) {
128                         return new MemberUpEvent(event.member().address(), event.member().getRoles());
129                     } else {
130                         return new MemberDownEvent(event.member().address(), event.member().getRoles());
131                     }
132                 });
133         cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
134
135         final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
136                 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
137                     if (event instanceof ClusterEvent.ReachableMember) {
138                         return new MemberReachableEvent(event.member().address(), event.member().getRoles());
139                     } else {
140                         return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
141                     }
142                 });
143         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
144
145         new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
146             Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
147
148         LOG.debug("Owner Supervisor started");
149     }
150
151     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
152                                                           final Map<DOMEntity, String> currentOwners) {
153         return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
154     }
155
156     @Override
157     public Receive<OwnerSupervisorCommand> createReceive() {
158         return newReceiveBuilder()
159                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
160                 .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
161                 .onMessage(OwnerChanged.class, this::onOwnerChanged)
162                 .onMessage(MemberUpEvent.class, this::onPeerUp)
163                 .onMessage(MemberDownEvent.class, this::onPeerDown)
164                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
165                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
166                 .onMessage(GetEntitiesRequest.class, this::onGetEntities)
167                 .onMessage(GetEntityRequest.class, this::onGetEntity)
168                 .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
169                 .build();
170     }
171
172     private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
173         LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
174         command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
175         return IdleSupervisor.create();
176     }
177
178     private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
179         LOG.debug("Owner has changed for {}", command.getResponse().key());
180         return this;
181     }
182
183     private void reassignUnreachableOwners() {
184         final Set<String> ownersToReassign = new HashSet<>();
185         for (final String owner : ownerToEntity.keys()) {
186             if (!activeMembers.contains(owner)) {
187                 ownersToReassign.add(owner);
188             }
189         }
190
191         for (final String owner : ownersToReassign) {
192             reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
193         }
194     }
195
196     private void assignMissingOwners() {
197         for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
198             if (!currentOwners.containsKey(entry.getKey())) {
199                 assignOwnerFor(entry.getKey());
200             }
201         }
202     }
203
204     private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
205         LOG.debug("onCandidatesChanged {}", message.getResponse());
206         if (message.getResponse() instanceof Replicator.Changed) {
207             final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
208                     (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
209             processCandidateChanges(changed.get(CandidateRegistry.KEY));
210         }
211         return this;
212     }
213
214     private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
215         final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
216         for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
217             processCandidatesFor(entry.getKey(), entry.getValue());
218         }
219     }
220
221     private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
222         LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
223
224         final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
225         // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
226         // no owner
227         if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
228             LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
229             currentCandidates.put(entity, new HashSet<>(candidates));
230
231             LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
232             assignOwnerFor(entity);
233
234             return;
235         }
236
237         final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
238         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
239
240         LOG.debug("currently present candidates: {}", currentlyPresent);
241         LOG.debug("difference: {}", difference);
242
243         final List<String> ownersToReassign = new ArrayList<>();
244
245         // first add/remove candidates from entities
246         for (final String toCheck : difference) {
247             if (!currentlyPresent.contains(toCheck)) {
248                 // add new candidate
249                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
250                 currentCandidates.get(entity).add(toCheck);
251
252                 if (!currentOwners.containsKey(entity)) {
253                     // might as well assign right away when we don't have an owner
254                     assignOwnerFor(entity);
255                 }
256
257                 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
258                 continue;
259             }
260
261             if (!candidates.contains(toCheck)) {
262                 // remove candidate
263                 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
264                 currentCandidates.get(entity).remove(toCheck);
265                 if (ownerToEntity.containsKey(toCheck)) {
266                     ownersToReassign.add(toCheck);
267                 }
268             }
269         }
270
271         // then reassign those that need new owners
272         for (final String toReassign : ownersToReassign) {
273             reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
274                     reassignPredicate);
275         }
276
277         if (currentCandidates.get(entity) == null) {
278             LOG.debug("Last candidate removed for {}", entity);
279         } else {
280             LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
281         }
282     }
283
284     private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
285                                        final BiPredicate<DOMEntity, String> predicate) {
286         LOG.debug("Reassigning owners for {}", entities);
287         for (final DOMEntity entity : entities) {
288             if (predicate.test(entity, oldOwner)) {
289                 ownerToEntity.remove(oldOwner, entity);
290                 assignOwnerFor(entity);
291             }
292         }
293     }
294
295     private boolean isActiveCandidate(final String candidate) {
296         return activeMembers.contains(candidate);
297     }
298
299     private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
300         return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate);
301     }
302
303     private void assignOwnerFor(final DOMEntity entity) {
304         final Set<String> candidatesForEntity = currentCandidates.get(entity);
305         if (candidatesForEntity.isEmpty()) {
306             LOG.debug("No candidates present for entity: {}", entity);
307             removeOwner(entity);
308             return;
309         }
310
311         String pickedCandidate = null;
312         for (final String candidate : candidatesForEntity) {
313             if (activeMembers.contains(candidate)) {
314                 pickedCandidate = candidate;
315                 break;
316             }
317         }
318         if (pickedCandidate == null) {
319             LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
320                     entity, activeMembers, currentCandidates.get(entity));
321             // no candidate is reachable so only remove owner if necessary
322             removeOwner(entity);
323             return;
324         }
325         ownerToEntity.put(pickedCandidate, entity);
326
327         LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
328         currentOwners.put(entity, pickedCandidate);
329         writeNewOwner(entity, pickedCandidate);
330     }
331
332     private void removeOwner(final DOMEntity entity) {
333         if (currentOwners.containsKey(entity)) {
334             // assign empty owner to dd, as we cannot delete data for a key since that would prevent
335             // writes for the same key
336             currentOwners.remove(entity);
337
338             writeNewOwner(entity, "");
339         }
340     }
341
342     private void writeNewOwner(final DOMEntity entity, final String candidate) {
343         ownerReplicator.askUpdate(
344                 askReplyTo -> new Replicator.Update<>(
345                         new LWWRegisterKey<>(entity.toString()),
346                         new LWWRegister<>(node.uniqueAddress(), candidate, 0),
347                         Replicator.writeLocal(),
348                         askReplyTo,
349                         register -> register.withValue(node, candidate, clock)),
350                 OwnerChanged::new);
351     }
352
353     private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
354         LOG.debug("Received MemberUp : {}", event);
355
356         handleReachableEvent(event.getRoles());
357         return this;
358     }
359
360     private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
361         LOG.debug("Received MemberReachable : {}", event);
362
363         handleReachableEvent(event.getRoles());
364         return this;
365     }
366
367     private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
368         request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
369         return this;
370     }
371
372     private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
373         final DOMEntity entity = extractEntity(request);
374         request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
375         return this;
376     }
377
378     private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
379         request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
380         return this;
381     }
382
383     private void handleReachableEvent(final Set<String> roles) {
384         if (roles.contains(dataCenter)) {
385             activeMembers.add(extractRole(roles));
386             assignMissingOwners();
387         } else {
388             LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
389         }
390     }
391
392     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
393         LOG.debug("Received MemberDown : {}", event);
394
395         handleUnreachableEvent(event.getRoles());
396         return this;
397     }
398
399     private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
400         LOG.debug("Received MemberUnreachable : {}", event);
401
402         handleUnreachableEvent(event.getRoles());
403         return this;
404     }
405
406     private void handleUnreachableEvent(final Set<String> roles) {
407         if (roles.contains(dataCenter)) {
408             activeMembers.remove(extractRole(roles));
409             reassignUnreachableOwners();
410         } else {
411             LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
412         }
413     }
414
415     private Set<String> getActiveMembers() {
416         final Set<String> members = new HashSet<>();
417         cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
418         // filter out unreachable
419         members.removeAll(cluster.state().getUnreachable().stream()
420                 .map(OwnerSupervisor::extractRole)
421                 .collect(Collectors.toSet()));
422
423         // filter out members not from our datacenter
424         cluster.state().getMembers().forEach(member -> {
425             if (!member.roles().contains(dataCenter)) {
426                 members.remove(extractRole(member));
427             }
428         });
429
430         return members;
431     }
432
433     private static DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
434         return new DOMEntity(request.getType().getValue(), request.getName().getValue());
435     }
436
437     private static String extractRole(final Member member) {
438         return extractRole(member.getRoles());
439     }
440
441     private static String extractRole(final Set<String> roles) {
442         return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
443                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
444     }
445
446     private String extractDatacenterRole(final Member member) {
447         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
448                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
449     }
450 }