41c83d4723d5425cca424871f54d3a95e4d83880
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.Member;
18 import akka.cluster.ddata.LWWRegister;
19 import akka.cluster.ddata.LWWRegisterKey;
20 import akka.cluster.ddata.ORMap;
21 import akka.cluster.ddata.ORSet;
22 import akka.cluster.ddata.SelfUniqueAddress;
23 import akka.cluster.ddata.typed.javadsl.DistributedData;
24 import akka.cluster.ddata.typed.javadsl.Replicator;
25 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
26 import akka.cluster.typed.Cluster;
27 import akka.cluster.typed.Subscribe;
28 import com.google.common.collect.HashMultimap;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableSet;
31 import com.google.common.collect.Multimap;
32 import com.google.common.collect.Sets;
33 import java.time.Duration;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.stream.Collectors;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
49 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
50 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.collection.JavaConverters;
54
55 /**
56  * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
57  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
58  * On cluster up/down etc. events the owners are reassigned if possible.
59  */
60 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
61
62     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
63     private static final String DATACENTER_PREFIX = "dc";
64
65     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
66
67     // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
68     // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
69     // running in a cluster-singleton
70     private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
71
72     private final Cluster cluster;
73     private final SelfUniqueAddress node;
74
75     private final Set<String> activeMembers;
76
77     // currently registered candidates
78     private final Map<DOMEntity, Set<String>> currentCandidates;
79     // current owners
80     private final Map<DOMEntity, String> currentOwners;
81     // reverse lookup of owner to entity
82     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
83
84     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
85                             final Map<DOMEntity, Set<String>> currentCandidates,
86                             final Map<DOMEntity, String> currentOwners) {
87         super(context);
88
89         final DistributedData distributedData = DistributedData.get(context.getSystem());
90         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
91
92         this.cluster = Cluster.get(context.getSystem());
93         this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
94
95         this.node = distributedData.selfUniqueAddress();
96         this.activeMembers = getActiveMembers(cluster);
97
98         this.currentCandidates = currentCandidates;
99         this.currentOwners = currentOwners;
100
101         for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
102             ownerToEntity.put(entry.getValue(), entry.getKey());
103         }
104
105         // check whether we have any unreachable/missing owners
106         reassignUnreachableOwners();
107         assignMissingOwners();
108
109         final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
110                 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
111                     if (event instanceof ClusterEvent.MemberUp) {
112                         return new MemberUpEvent(event.member().address(), event.member().getRoles());
113                     } else {
114                         return new MemberDownEvent(event.member().address(), event.member().getRoles());
115                     }
116                 });
117         cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
118
119         final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
120                 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
121                     if (event instanceof ClusterEvent.ReachableMember) {
122                         return new MemberReachableEvent(event.member().address(), event.member().getRoles());
123                     } else {
124                         return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
125                     }
126                 });
127         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
128
129         new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
130             Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
131
132         LOG.debug("Owner Supervisor started");
133     }
134
135     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
136                                                           final Map<DOMEntity, String> currentOwners) {
137         return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
138     }
139
140     @Override
141     public Receive<OwnerSupervisorCommand> createReceive() {
142         return newReceiveBuilder()
143                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
144                 .onMessage(MemberUpEvent.class, this::onPeerUp)
145                 .onMessage(MemberDownEvent.class, this::onPeerDown)
146                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
147                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
148                 .build();
149     }
150
151     private void reassignUnreachableOwners() {
152         final Set<String> ownersToReassign = new HashSet<>();
153         for (final String owner : ownerToEntity.keys()) {
154             if (!activeMembers.contains(owner)) {
155                 ownersToReassign.add(owner);
156             }
157         }
158
159         for (final String owner : ownersToReassign) {
160             reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)));
161         }
162     }
163
164     private void assignMissingOwners() {
165         for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
166             if (!currentOwners.containsKey(entry.getKey())) {
167                 assignOwnerFor(entry.getKey());
168             }
169         }
170     }
171
172     private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
173         LOG.debug("onCandidatesChanged {}", message.getResponse());
174         if (message.getResponse() instanceof Replicator.Changed) {
175             final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
176                     (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
177             processCandidateChanges(changed.get(CandidateRegistry.KEY));
178         }
179         return this;
180     }
181
182     private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
183         final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
184         for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
185             processCandidatesFor(entry.getKey(), entry.getValue());
186         }
187     }
188
189     private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
190         LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
191
192         final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
193         // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
194         // no owner
195         if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
196             LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
197             currentCandidates.put(entity, new HashSet<>(candidates));
198
199             LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
200             assignOwnerFor(entity);
201
202             return;
203         }
204
205         final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
206         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
207
208         LOG.debug("currently present candidates: {}", currentlyPresent);
209         LOG.debug("difference: {}", difference);
210
211         final List<String> ownersToReassign = new ArrayList<>();
212
213         // first add/remove candidates from entities
214         for (final String toCheck : difference) {
215             if (!currentlyPresent.contains(toCheck)) {
216                 // add new candidate
217                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
218                 currentCandidates.get(entity).add(toCheck);
219
220                 if (!currentOwners.containsKey(entity)) {
221                     // might as well assign right away when we don't have an owner
222                     assignOwnerFor(entity);
223                 }
224
225                 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
226                 continue;
227             }
228
229             if (!candidates.contains(toCheck)) {
230                 // remove candidate
231                 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
232                 currentCandidates.get(entity).remove(toCheck);
233                 if (ownerToEntity.containsKey(toCheck)) {
234                     ownersToReassign.add(toCheck);
235                 }
236             }
237         }
238
239         // then reassign those that need new owners
240         for (final String toReassign : ownersToReassign) {
241             reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)));
242         }
243
244         if (currentCandidates.get(entity) == null) {
245             LOG.debug("Last candidate removed for {}", entity);
246         } else {
247             LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
248         }
249     }
250
251     private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities) {
252         LOG.debug("Reassigning owners for {}", entities);
253         for (final DOMEntity entity : entities) {
254
255             // only reassign owner for those entities that lost this candidate or is not reachable
256             if (!activeMembers.contains(oldOwner)
257                     || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) {
258                 ownerToEntity.remove(oldOwner, entity);
259                 assignOwnerFor(entity);
260             }
261         }
262     }
263
264     private void assignOwnerFor(final DOMEntity entity) {
265         final Set<String> candidatesForEntity = currentCandidates.get(entity);
266         if (candidatesForEntity.isEmpty()) {
267             LOG.debug("No candidates present for entity: {}", entity);
268             removeOwner(entity);
269             return;
270         }
271
272         String pickedCandidate = null;
273         for (final String candidate : candidatesForEntity) {
274             if (activeMembers.contains(candidate)) {
275                 pickedCandidate = candidate;
276                 break;
277             }
278         }
279         if (pickedCandidate == null) {
280             LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
281                     entity, activeMembers, currentCandidates.get(entity));
282             // no candidate is reachable so only remove owner if necessary
283             removeOwner(entity);
284             return;
285         }
286         ownerToEntity.put(pickedCandidate, entity);
287
288         LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
289         currentOwners.put(entity, pickedCandidate);
290         writeNewOwner(entity, pickedCandidate);
291     }
292
293     private void removeOwner(final DOMEntity entity) {
294         if (currentOwners.containsKey(entity)) {
295             // assign empty owner to dd, as we cannot delete data for a key since that would prevent
296             // writes for the same key
297             currentOwners.remove(entity);
298
299             writeNewOwner(entity, "");
300         }
301     }
302
303     private void writeNewOwner(final DOMEntity entity, final String candidate) {
304         ownerReplicator.askUpdate(
305                 askReplyTo -> new Replicator.Update<>(
306                         new LWWRegisterKey<>(entity.toString()),
307                         new LWWRegister<>(node.uniqueAddress(), candidate, 0),
308                         Replicator.writeLocal(),
309                         askReplyTo,
310                         register -> register.withValue(node, candidate, clock)),
311                 OwnerChanged::new);
312     }
313
314     private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
315         LOG.debug("Received MemberUp : {}", event);
316
317         handleReachableEvent(event.getRoles());
318         return this;
319     }
320
321     private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
322         LOG.debug("Received MemberReachable : {}", event);
323
324         handleReachableEvent(event.getRoles());
325         return this;
326     }
327
328     private void handleReachableEvent(final Set<String> roles) {
329         activeMembers.add(extractRole(roles));
330         assignMissingOwners();
331     }
332
333     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
334         LOG.debug("Received MemberDown : {}", event);
335
336         handleUnreachableEvent(event.getRoles());
337         return this;
338     }
339
340     private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
341         LOG.debug("Received MemberUnreachable : {}", event);
342
343         handleUnreachableEvent(event.getRoles());
344         return this;
345     }
346
347     private void handleUnreachableEvent(final Set<String> roles) {
348         activeMembers.remove(extractRole(roles));
349         reassignUnreachableOwners();
350     }
351
352     private static Set<String> getActiveMembers(final Cluster cluster) {
353         final Set<String> activeMembers = new HashSet<>();
354         cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member)));
355         activeMembers.removeAll(cluster.state().getUnreachable().stream()
356                 .map(OwnerSupervisor::extractRole).collect(Collectors.toSet()));
357
358         return activeMembers;
359     }
360
361     private static String extractRole(final Member member) {
362         return extractRole(member.getRoles());
363     }
364
365     private static String extractRole(final Set<String> roles) {
366         return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
367                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
368     }
369 }