Cleanup OwnerSupervisor a bit
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.ddata.LWWRegister;
20 import akka.cluster.ddata.LWWRegisterKey;
21 import akka.cluster.ddata.ORMap;
22 import akka.cluster.ddata.ORSet;
23 import akka.cluster.ddata.SelfUniqueAddress;
24 import akka.cluster.ddata.typed.javadsl.DistributedData;
25 import akka.cluster.ddata.typed.javadsl.Replicator;
26 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
27 import akka.cluster.typed.Cluster;
28 import akka.cluster.typed.Subscribe;
29 import com.google.common.collect.HashMultimap;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableSet;
32 import com.google.common.collect.Multimap;
33 import com.google.common.collect.Sets;
34 import java.time.Duration;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.function.BiPredicate;
42 import java.util.stream.Collectors;
43 import java.util.stream.StreamSupport;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
51 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
52 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
53 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
54 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
55 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
56 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
57 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
58 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
59 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
60 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
61 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.collection.JavaConverters;
65
66 /**
67  * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
68  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
69  * On cluster up/down etc. events the owners are reassigned if possible.
70  */
71 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
72
73     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
74     private static final String DATACENTER_PREFIX = "dc-";
75
76     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
77
78     // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
79     // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
80     // running in a cluster-singleton
81     private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
82
83     private final Cluster cluster;
84     private final SelfUniqueAddress node;
85     private final String dataCenter;
86
87     private final Set<String> activeMembers;
88
89     // currently registered candidates
90     private final Map<DOMEntity, Set<String>> currentCandidates;
91     // current owners
92     private final Map<DOMEntity, String> currentOwners;
93     // reverse lookup of owner to entity
94     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
95
96     // only reassign owner for those entities that lost this candidate or is not reachable
97     private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
98             !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
99
100     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
101                             final Map<DOMEntity, Set<String>> currentCandidates,
102                             final Map<DOMEntity, String> currentOwners) {
103         super(context);
104
105         final DistributedData distributedData = DistributedData.get(context.getSystem());
106         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
107
108         cluster = Cluster.get(context.getSystem());
109         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
110         dataCenter = extractDatacenterRole(cluster.selfMember());
111
112         node = distributedData.selfUniqueAddress();
113         activeMembers = getActiveMembers();
114
115         this.currentCandidates = currentCandidates;
116         this.currentOwners = currentOwners;
117
118         for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
119             ownerToEntity.put(entry.getValue(), entry.getKey());
120         }
121
122         // check whether we have any unreachable/missing owners
123         reassignUnreachableOwners();
124         assignMissingOwners();
125
126         final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
127                 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
128                     if (event instanceof ClusterEvent.MemberUp) {
129                         return new MemberUpEvent(event.member().address(), event.member().getRoles());
130                     } else {
131                         return new MemberDownEvent(event.member().address(), event.member().getRoles());
132                     }
133                 });
134         cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
135
136         final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
137                 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
138                     if (event instanceof ClusterEvent.ReachableMember) {
139                         return new MemberReachableEvent(event.member().address(), event.member().getRoles());
140                     } else {
141                         return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
142                     }
143                 });
144         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
145
146         new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
147             Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
148
149         LOG.debug("Owner Supervisor started");
150     }
151
152     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
153                                                           final Map<DOMEntity, String> currentOwners) {
154         return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
155     }
156
157     @Override
158     public Receive<OwnerSupervisorCommand> createReceive() {
159         return newReceiveBuilder()
160                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
161                 .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
162                 .onMessage(OwnerChanged.class, this::onOwnerChanged)
163                 .onMessage(MemberUpEvent.class, this::onPeerUp)
164                 .onMessage(MemberDownEvent.class, this::onPeerDown)
165                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
166                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
167                 .onMessage(GetEntitiesRequest.class, this::onGetEntities)
168                 .onMessage(GetEntityRequest.class, this::onGetEntity)
169                 .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
170                 .build();
171     }
172
173     private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
174         LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
175         command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
176         return IdleSupervisor.create();
177     }
178
179     private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
180         LOG.debug("Owner has changed for {}", command.getResponse().key());
181         return this;
182     }
183
184     private void reassignUnreachableOwners() {
185         final Set<String> ownersToReassign = new HashSet<>();
186         for (final String owner : ownerToEntity.keys()) {
187             if (!activeMembers.contains(owner)) {
188                 ownersToReassign.add(owner);
189             }
190         }
191
192         for (final String owner : ownersToReassign) {
193             reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
194         }
195     }
196
197     private void assignMissingOwners() {
198         for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
199             if (!currentOwners.containsKey(entry.getKey())) {
200                 assignOwnerFor(entry.getKey());
201             }
202         }
203     }
204
205     private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
206         LOG.debug("onCandidatesChanged {}", message.getResponse());
207         if (message.getResponse() instanceof Replicator.Changed) {
208             final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
209                     (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
210             processCandidateChanges(changed.get(CandidateRegistry.KEY));
211         }
212         return this;
213     }
214
215     private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
216         final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
217         for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
218             processCandidatesFor(entry.getKey(), entry.getValue());
219         }
220     }
221
222     private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
223         LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
224
225         final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
226         // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
227         // no owner
228         if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
229             LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
230             currentCandidates.put(entity, new HashSet<>(candidates));
231
232             LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
233             assignOwnerFor(entity);
234
235             return;
236         }
237
238         final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
239         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
240
241         LOG.debug("currently present candidates: {}", currentlyPresent);
242         LOG.debug("difference: {}", difference);
243
244         final List<String> ownersToReassign = new ArrayList<>();
245
246         // first add/remove candidates from entities
247         for (final String toCheck : difference) {
248             if (!currentlyPresent.contains(toCheck)) {
249                 // add new candidate
250                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
251                 currentCandidates.get(entity).add(toCheck);
252
253                 if (!currentOwners.containsKey(entity)) {
254                     // might as well assign right away when we don't have an owner
255                     assignOwnerFor(entity);
256                 }
257
258                 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
259                 continue;
260             }
261
262             if (!candidates.contains(toCheck)) {
263                 // remove candidate
264                 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
265                 currentCandidates.get(entity).remove(toCheck);
266                 if (ownerToEntity.containsKey(toCheck)) {
267                     ownersToReassign.add(toCheck);
268                 }
269             }
270         }
271
272         // then reassign those that need new owners
273         for (final String toReassign : ownersToReassign) {
274             reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
275                     reassignPredicate);
276         }
277
278         if (currentCandidates.get(entity) == null) {
279             LOG.debug("Last candidate removed for {}", entity);
280         } else {
281             LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
282         }
283     }
284
285     private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
286                                        final BiPredicate<DOMEntity, String> predicate) {
287         LOG.debug("Reassigning owners for {}", entities);
288         for (final DOMEntity entity : entities) {
289             if (predicate.test(entity, oldOwner)) {
290                 ownerToEntity.remove(oldOwner, entity);
291                 assignOwnerFor(entity);
292             }
293         }
294     }
295
296     private boolean isActiveCandidate(final String candidate) {
297         return activeMembers.contains(candidate);
298     }
299
300     private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
301         return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
302     }
303
304     private void assignOwnerFor(final DOMEntity entity) {
305         final Set<String> candidatesForEntity = currentCandidates.get(entity);
306         if (candidatesForEntity.isEmpty()) {
307             LOG.debug("No candidates present for entity: {}", entity);
308             removeOwner(entity);
309             return;
310         }
311
312         String pickedCandidate = null;
313         for (final String candidate : candidatesForEntity) {
314             if (activeMembers.contains(candidate)) {
315                 pickedCandidate = candidate;
316                 break;
317             }
318         }
319         if (pickedCandidate == null) {
320             LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
321                     entity, activeMembers, currentCandidates.get(entity));
322             // no candidate is reachable so only remove owner if necessary
323             removeOwner(entity);
324             return;
325         }
326         ownerToEntity.put(pickedCandidate, entity);
327
328         LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
329         currentOwners.put(entity, pickedCandidate);
330         writeNewOwner(entity, pickedCandidate);
331     }
332
333     private void removeOwner(final DOMEntity entity) {
334         if (currentOwners.containsKey(entity)) {
335             // assign empty owner to dd, as we cannot delete data for a key since that would prevent
336             // writes for the same key
337             currentOwners.remove(entity);
338
339             writeNewOwner(entity, "");
340         }
341     }
342
343     private void writeNewOwner(final DOMEntity entity, final String candidate) {
344         ownerReplicator.askUpdate(
345                 askReplyTo -> new Replicator.Update<>(
346                         new LWWRegisterKey<>(entity.toString()),
347                         new LWWRegister<>(node.uniqueAddress(), candidate, 0),
348                         Replicator.writeLocal(),
349                         askReplyTo,
350                         register -> register.withValue(node, candidate, clock)),
351                 OwnerChanged::new);
352     }
353
354     private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
355         LOG.debug("Received MemberUp : {}", event);
356
357         handleReachableEvent(event.getRoles());
358         return this;
359     }
360
361     private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
362         LOG.debug("Received MemberReachable : {}", event);
363
364         handleReachableEvent(event.getRoles());
365         return this;
366     }
367
368     private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
369         request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
370         return this;
371     }
372
373     private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
374         final DOMEntity entity = extractEntity(request);
375         request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
376         return this;
377     }
378
379     private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
380         request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
381         return this;
382     }
383
384     private void handleReachableEvent(final Set<String> roles) {
385         if (roles.contains(dataCenter)) {
386             activeMembers.add(extractRole(roles));
387             assignMissingOwners();
388         } else {
389             LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
390         }
391     }
392
393     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
394         LOG.debug("Received MemberDown : {}", event);
395
396         handleUnreachableEvent(event.getRoles());
397         return this;
398     }
399
400     private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
401         LOG.debug("Received MemberUnreachable : {}", event);
402
403         handleUnreachableEvent(event.getRoles());
404         return this;
405     }
406
407     private void handleUnreachableEvent(final Set<String> roles) {
408         if (roles.contains(dataCenter)) {
409             activeMembers.remove(extractRole(roles));
410             reassignUnreachableOwners();
411         } else {
412             LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
413         }
414     }
415
416     private Set<String> getActiveMembers() {
417         final CurrentClusterState clusterState = cluster.state();
418         final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
419             .map(OwnerSupervisor::extractRole)
420             .collect(Collectors.toSet());
421
422         return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
423             // We are evaluating the set of roles for each member
424             .map(Member::getRoles)
425             // Filter out any members which do not share our dataCenter
426             .filter(roles -> roles.contains(dataCenter))
427             // Find first legal role
428             .map(OwnerSupervisor::extractRole)
429             // filter out unreachable roles
430             .filter(role -> !unreachableRoles.contains(role))
431             .collect(Collectors.toSet());
432     }
433
434     private static DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
435         return new DOMEntity(request.getType().getValue(), request.getName().getValue());
436     }
437
438     private static String extractRole(final Member member) {
439         return extractRole(member.getRoles());
440     }
441
442     private static String extractRole(final Set<String> roles) {
443         return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
444                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
445     }
446
447     private static String extractDatacenterRole(final Member member) {
448         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
449                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
450     }
451 }