f66f25aa821d2267cc18633b9752e626171da23f
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.typed.ActorRef;
14 import akka.actor.typed.Behavior;
15 import akka.actor.typed.javadsl.AbstractBehavior;
16 import akka.actor.typed.javadsl.ActorContext;
17 import akka.actor.typed.javadsl.Behaviors;
18 import akka.actor.typed.javadsl.Receive;
19 import akka.cluster.ClusterEvent;
20 import akka.cluster.ClusterEvent.CurrentClusterState;
21 import akka.cluster.Member;
22 import akka.cluster.ddata.LWWRegister;
23 import akka.cluster.ddata.LWWRegisterKey;
24 import akka.cluster.ddata.ORMap;
25 import akka.cluster.ddata.ORSet;
26 import akka.cluster.ddata.SelfUniqueAddress;
27 import akka.cluster.ddata.typed.javadsl.DistributedData;
28 import akka.cluster.ddata.typed.javadsl.Replicator;
29 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
30 import akka.cluster.typed.Cluster;
31 import akka.cluster.typed.Subscribe;
32 import com.google.common.collect.HashMultimap;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.ImmutableSet;
35 import com.google.common.collect.Multimap;
36 import com.google.common.collect.Sets;
37 import java.time.Duration;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.function.BiPredicate;
45 import java.util.stream.Collectors;
46 import java.util.stream.StreamSupport;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
51 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
52 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
53 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
54 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
55 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
56 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
57 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
58 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
59 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
60 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
61 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
62 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
63 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
64 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
65 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68 import scala.collection.JavaConverters;
69
70 /**
71  * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
72  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
73  * On cluster up/down etc. events the owners are reassigned if possible.
74  */
75 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
76
77     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
78     private static final String DATACENTER_PREFIX = "dc-";
79
80     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
81
82     // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
83     // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
84     // running in a cluster-singleton
85     private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
86
87     private final Cluster cluster;
88     private final SelfUniqueAddress node;
89     private final String dataCenter;
90
91     private final Set<String> activeMembers;
92
93     // currently registered candidates
94     private final Map<DOMEntity, Set<String>> currentCandidates;
95     // current owners
96     private final Map<DOMEntity, String> currentOwners;
97     // reverse lookup of owner to entity
98     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
99
100     // only reassign owner for those entities that lost this candidate or is not reachable
101     private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
102             !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
103
104     private final BindingInstanceIdentifierCodec iidCodec;
105
106     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
107                             final Map<DOMEntity, Set<String>> currentCandidates,
108                             final Map<DOMEntity, String> currentOwners,
109                             final BindingInstanceIdentifierCodec iidCodec) {
110         super(context);
111         this.iidCodec = requireNonNull(iidCodec);
112
113         final DistributedData distributedData = DistributedData.get(context.getSystem());
114         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
115
116         cluster = Cluster.get(context.getSystem());
117         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
118         dataCenter = extractDatacenterRole(cluster.selfMember());
119
120         node = distributedData.selfUniqueAddress();
121         activeMembers = getActiveMembers();
122
123         this.currentCandidates = currentCandidates;
124         this.currentOwners = currentOwners;
125
126         for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
127             ownerToEntity.put(entry.getValue(), entry.getKey());
128         }
129
130         // check whether we have any unreachable/missing owners
131         reassignUnreachableOwners();
132         assignMissingOwners();
133
134         final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
135                 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
136                     if (event instanceof ClusterEvent.MemberUp) {
137                         return new MemberUpEvent(event.member().address(), event.member().getRoles());
138                     } else {
139                         return new MemberDownEvent(event.member().address(), event.member().getRoles());
140                     }
141                 });
142         cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
143
144         final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
145                 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
146                     if (event instanceof ClusterEvent.ReachableMember) {
147                         return new MemberReachableEvent(event.member().address(), event.member().getRoles());
148                     } else {
149                         return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
150                     }
151                 });
152         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
153
154         new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
155             Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
156
157         LOG.debug("Owner Supervisor started");
158     }
159
160     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
161             final Map<DOMEntity, String> currentOwners, final BindingInstanceIdentifierCodec iidCodec) {
162         return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners, iidCodec));
163     }
164
165     @Override
166     public Receive<OwnerSupervisorCommand> createReceive() {
167         return newReceiveBuilder()
168                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
169                 .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
170                 .onMessage(OwnerChanged.class, this::onOwnerChanged)
171                 .onMessage(MemberUpEvent.class, this::onPeerUp)
172                 .onMessage(MemberDownEvent.class, this::onPeerDown)
173                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
174                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
175                 .onMessage(GetEntitiesRequest.class, this::onGetEntities)
176                 .onMessage(GetEntityRequest.class, this::onGetEntity)
177                 .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
178                 .build();
179     }
180
181     private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
182         LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
183         command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
184         return IdleSupervisor.create(iidCodec);
185     }
186
187     private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
188         LOG.debug("Owner has changed for {}", command.getResponse().key());
189         return this;
190     }
191
192     private void reassignUnreachableOwners() {
193         final Set<String> ownersToReassign = new HashSet<>();
194         for (final String owner : ownerToEntity.keys()) {
195             if (!activeMembers.contains(owner)) {
196                 ownersToReassign.add(owner);
197             }
198         }
199
200         for (final String owner : ownersToReassign) {
201             reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
202         }
203     }
204
205     private void assignMissingOwners() {
206         for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
207             if (!currentOwners.containsKey(entry.getKey())) {
208                 assignOwnerFor(entry.getKey());
209             }
210         }
211     }
212
213     private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
214         LOG.debug("onCandidatesChanged {}", message.getResponse());
215         if (message.getResponse() instanceof Replicator.Changed) {
216             final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
217                     (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
218             processCandidateChanges(changed.get(CandidateRegistry.KEY));
219         }
220         return this;
221     }
222
223     private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
224         final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
225         for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
226             processCandidatesFor(entry.getKey(), entry.getValue());
227         }
228     }
229
230     private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
231         LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
232
233         final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
234         // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
235         // no owner
236         if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
237             LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
238             currentCandidates.put(entity, new HashSet<>(candidates));
239
240             LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
241             assignOwnerFor(entity);
242
243             return;
244         }
245
246         final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
247         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
248
249         LOG.debug("currently present candidates: {}", currentlyPresent);
250         LOG.debug("difference: {}", difference);
251
252         final List<String> ownersToReassign = new ArrayList<>();
253
254         // first add/remove candidates from entities
255         for (final String toCheck : difference) {
256             if (!currentlyPresent.contains(toCheck)) {
257                 // add new candidate
258                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
259                 currentCandidates.get(entity).add(toCheck);
260
261                 if (!currentOwners.containsKey(entity)) {
262                     // might as well assign right away when we don't have an owner
263                     assignOwnerFor(entity);
264                 }
265
266                 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
267                 continue;
268             }
269
270             if (!candidates.contains(toCheck)) {
271                 // remove candidate
272                 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
273                 currentCandidates.get(entity).remove(toCheck);
274                 if (ownerToEntity.containsKey(toCheck)) {
275                     ownersToReassign.add(toCheck);
276                 }
277             }
278         }
279
280         // then reassign those that need new owners
281         for (final String toReassign : ownersToReassign) {
282             reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
283                     reassignPredicate);
284         }
285
286         if (currentCandidates.get(entity) == null) {
287             LOG.debug("Last candidate removed for {}", entity);
288         } else {
289             LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
290         }
291     }
292
293     private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
294                                        final BiPredicate<DOMEntity, String> predicate) {
295         LOG.debug("Reassigning owners for {}", entities);
296         for (final DOMEntity entity : entities) {
297             if (predicate.test(entity, oldOwner)) {
298                 ownerToEntity.remove(oldOwner, entity);
299                 assignOwnerFor(entity);
300             }
301         }
302     }
303
304     private boolean isActiveCandidate(final String candidate) {
305         return activeMembers.contains(candidate);
306     }
307
308     private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
309         return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
310     }
311
312     private void assignOwnerFor(final DOMEntity entity) {
313         final Set<String> candidatesForEntity = currentCandidates.get(entity);
314         if (candidatesForEntity.isEmpty()) {
315             LOG.debug("No candidates present for entity: {}", entity);
316             removeOwner(entity);
317             return;
318         }
319
320         String pickedCandidate = null;
321         for (final String candidate : candidatesForEntity) {
322             if (activeMembers.contains(candidate)) {
323                 pickedCandidate = candidate;
324                 break;
325             }
326         }
327         if (pickedCandidate == null) {
328             LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
329                     entity, activeMembers, currentCandidates.get(entity));
330             // no candidate is reachable so only remove owner if necessary
331             removeOwner(entity);
332             return;
333         }
334         ownerToEntity.put(pickedCandidate, entity);
335
336         LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
337         currentOwners.put(entity, pickedCandidate);
338         writeNewOwner(entity, pickedCandidate);
339     }
340
341     private void removeOwner(final DOMEntity entity) {
342         if (currentOwners.containsKey(entity)) {
343             // assign empty owner to dd, as we cannot delete data for a key since that would prevent
344             // writes for the same key
345             currentOwners.remove(entity);
346
347             writeNewOwner(entity, "");
348         }
349     }
350
351     private void writeNewOwner(final DOMEntity entity, final String candidate) {
352         ownerReplicator.askUpdate(
353                 askReplyTo -> new Replicator.Update<>(
354                         new LWWRegisterKey<>(entity.toString()),
355                         new LWWRegister<>(node.uniqueAddress(), candidate, 0),
356                         Replicator.writeLocal(),
357                         askReplyTo,
358                         register -> register.withValue(node, candidate, clock)),
359                 OwnerChanged::new);
360     }
361
362     private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
363         LOG.debug("Received MemberUp : {}", event);
364
365         handleReachableEvent(event.getRoles());
366         return this;
367     }
368
369     private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
370         LOG.debug("Received MemberReachable : {}", event);
371
372         handleReachableEvent(event.getRoles());
373         return this;
374     }
375
376     private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
377         request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
378         return this;
379     }
380
381     private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
382         final DOMEntity entity = extractEntity(request);
383         request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
384         return this;
385     }
386
387     private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
388         request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
389         return this;
390     }
391
392     private void handleReachableEvent(final Set<String> roles) {
393         if (roles.contains(dataCenter)) {
394             activeMembers.add(extractRole(roles));
395             assignMissingOwners();
396         } else {
397             LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
398         }
399     }
400
401     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
402         LOG.debug("Received MemberDown : {}", event);
403
404         handleUnreachableEvent(event.getRoles());
405         return this;
406     }
407
408     private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
409         LOG.debug("Received MemberUnreachable : {}", event);
410
411         handleUnreachableEvent(event.getRoles());
412         return this;
413     }
414
415     private void handleUnreachableEvent(final Set<String> roles) {
416         if (roles.contains(dataCenter)) {
417             activeMembers.remove(extractRole(roles));
418             reassignUnreachableOwners();
419         } else {
420             LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
421         }
422     }
423
424     private Set<String> getActiveMembers() {
425         final CurrentClusterState clusterState = cluster.state();
426         final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
427             .map(OwnerSupervisor::extractRole)
428             .collect(Collectors.toSet());
429
430         return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
431             // We are evaluating the set of roles for each member
432             .map(Member::getRoles)
433             // Filter out any members which do not share our dataCenter
434             .filter(roles -> roles.contains(dataCenter))
435             // Find first legal role
436             .map(OwnerSupervisor::extractRole)
437             // filter out unreachable roles
438             .filter(role -> !unreachableRoles.contains(role))
439             .collect(Collectors.toSet());
440     }
441
442     private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
443         final var name = request.getName();
444         final var iid = name.getInstanceIdentifier();
445         if (iid != null) {
446             return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
447         }
448         final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
449         return new DOMEntity(request.getType().getValue(), str);
450     }
451
452     private static String extractRole(final Member member) {
453         return extractRole(member.getRoles());
454     }
455
456     private static String extractRole(final Set<String> roles) {
457         return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
458                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
459     }
460
461     private static String extractDatacenterRole(final Member member) {
462         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
463                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
464     }
465 }