Move {Identifiable,Persistent,}Payload
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.typed.ActorRef;
14 import akka.actor.typed.Behavior;
15 import akka.actor.typed.javadsl.ActorContext;
16 import akka.actor.typed.javadsl.Behaviors;
17 import akka.actor.typed.javadsl.Receive;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.ClusterEvent.CurrentClusterState;
20 import akka.cluster.Member;
21 import akka.cluster.ddata.LWWRegister;
22 import akka.cluster.ddata.LWWRegisterKey;
23 import akka.cluster.ddata.ORMap;
24 import akka.cluster.ddata.ORSet;
25 import akka.cluster.ddata.SelfUniqueAddress;
26 import akka.cluster.ddata.typed.javadsl.DistributedData;
27 import akka.cluster.ddata.typed.javadsl.Replicator;
28 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
29 import akka.cluster.typed.Cluster;
30 import akka.cluster.typed.Subscribe;
31 import akka.pattern.StatusReply;
32 import com.google.common.collect.HashMultimap;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.ImmutableSet;
35 import com.google.common.collect.Multimap;
36 import com.google.common.collect.Sets;
37 import java.time.Duration;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.function.BiPredicate;
45 import java.util.stream.Collectors;
46 import java.util.stream.StreamSupport;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
51 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
52 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
53 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
54 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
55 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
56 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
57 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
58 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
59 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
60 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
61 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
62 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
63 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
64 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
65 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
66 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.collection.JavaConverters;
71
72 /**
73  * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
74  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
75  * On cluster up/down etc. events the owners are reassigned if possible.
76  */
77 public final class OwnerSupervisor extends AbstractSupervisor {
78
79     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
80     private static final String DATACENTER_PREFIX = "dc-";
81
82     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
83
84     // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
85     // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
86     // running in a cluster-singleton
87     private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
88
89     private final Cluster cluster;
90     private final SelfUniqueAddress node;
91     private final String dataCenter;
92
93     private final Set<String> activeMembers;
94
95     // currently registered candidates
96     private final Map<DOMEntity, Set<String>> currentCandidates;
97     // current owners
98     private final Map<DOMEntity, String> currentOwners;
99     // reverse lookup of owner to entity
100     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
101
102     // only reassign owner for those entities that lost this candidate or is not reachable
103     private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
104             !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
105
106     private final BindingInstanceIdentifierCodec iidCodec;
107
108     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
109                             final Map<DOMEntity, Set<String>> currentCandidates,
110                             final Map<DOMEntity, String> currentOwners,
111                             final BindingInstanceIdentifierCodec iidCodec) {
112         super(context);
113         this.iidCodec = requireNonNull(iidCodec);
114
115         final DistributedData distributedData = DistributedData.get(context.getSystem());
116         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
117
118         cluster = Cluster.get(context.getSystem());
119         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
120         dataCenter = extractDatacenterRole(cluster.selfMember());
121
122         node = distributedData.selfUniqueAddress();
123         activeMembers = getActiveMembers();
124
125         this.currentCandidates = currentCandidates;
126         this.currentOwners = currentOwners;
127
128         for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
129             ownerToEntity.put(entry.getValue(), entry.getKey());
130         }
131
132         // check whether we have any unreachable/missing owners
133         reassignUnreachableOwners();
134         assignMissingOwners();
135
136         final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
137                 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
138                     if (event instanceof ClusterEvent.MemberUp) {
139                         return new MemberUpEvent(event.member().address(), event.member().getRoles());
140                     } else {
141                         return new MemberDownEvent(event.member().address(), event.member().getRoles());
142                     }
143                 });
144         cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
145
146         final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
147                 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
148                     if (event instanceof ClusterEvent.ReachableMember) {
149                         return new MemberReachableEvent(event.member().address(), event.member().getRoles());
150                     } else {
151                         return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
152                     }
153                 });
154         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
155
156         candidateReplicator.subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
157
158         LOG.debug("Owner Supervisor started");
159     }
160
161     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
162             final Map<DOMEntity, String> currentOwners, final BindingInstanceIdentifierCodec iidCodec) {
163         return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners, iidCodec));
164     }
165
166     @Override
167     public Receive<OwnerSupervisorCommand> createReceive() {
168         return newReceiveBuilder()
169                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
170                 .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
171                 .onMessage(OwnerChanged.class, this::onOwnerChanged)
172                 .onMessage(MemberUpEvent.class, this::onPeerUp)
173                 .onMessage(MemberDownEvent.class, this::onPeerDown)
174                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
175                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
176                 .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
177                 .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
178                 .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
179                 .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
180                 .onMessage(ClearCandidates.class, this::finishClearCandidates)
181                 .build();
182     }
183
184     private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
185         LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
186         command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
187         return IdleSupervisor.create(iidCodec);
188     }
189
190     private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
191         LOG.debug("Owner has changed for {}", command.getResponse().key());
192         return this;
193     }
194
195     private void reassignUnreachableOwners() {
196         final Set<String> ownersToReassign = new HashSet<>();
197         for (final String owner : ownerToEntity.keys()) {
198             if (!isActiveCandidate(owner)) {
199                 ownersToReassign.add(owner);
200             }
201         }
202
203         for (final String owner : ownersToReassign) {
204             reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
205         }
206     }
207
208     private void assignMissingOwners() {
209         for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
210             if (!currentOwners.containsKey(entry.getKey())) {
211                 assignOwnerFor(entry.getKey());
212             }
213         }
214     }
215
216     private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
217         LOG.debug("onCandidatesChanged {}", message.getResponse());
218         if (message.getResponse() instanceof Replicator.Changed) {
219             final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
220                     (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
221             processCandidateChanges(changed.get(CandidateRegistry.KEY));
222         }
223         return this;
224     }
225
226     private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
227         final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
228         for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
229             processCandidatesFor(entry.getKey(), entry.getValue());
230         }
231     }
232
233     private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
234         LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
235
236         final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
237         // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
238         // no owner
239         if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
240             LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
241             currentCandidates.put(entity, new HashSet<>(candidates));
242
243             LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
244             assignOwnerFor(entity);
245
246             return;
247         }
248
249         final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
250         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
251
252         LOG.debug("currently present candidates: {}", currentlyPresent);
253         LOG.debug("difference: {}", difference);
254
255         final List<String> ownersToReassign = new ArrayList<>();
256
257         // first add/remove candidates from entities
258         for (final String toCheck : difference) {
259             if (!currentlyPresent.contains(toCheck)) {
260                 // add new candidate
261                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
262                 currentCandidates.get(entity).add(toCheck);
263
264                 final String currentOwner = currentOwners.get(entity);
265
266                 if (currentOwner == null || !activeMembers.contains(currentOwner)) {
267                     // might as well assign right away when we don't have an owner or its unreachable
268                     assignOwnerFor(entity);
269                 }
270
271                 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
272                 continue;
273             }
274
275             if (!candidates.contains(toCheck)) {
276                 // remove candidate
277                 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
278                 currentCandidates.get(entity).remove(toCheck);
279                 if (ownerToEntity.containsKey(toCheck)) {
280                     ownersToReassign.add(toCheck);
281                 }
282             }
283         }
284
285         // then reassign those that need new owners
286         for (final String toReassign : ownersToReassign) {
287             reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
288                     reassignPredicate);
289         }
290
291         if (currentCandidates.get(entity) == null) {
292             LOG.debug("Last candidate removed for {}", entity);
293         } else {
294             LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
295         }
296     }
297
298     private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
299                                        final BiPredicate<DOMEntity, String> predicate) {
300         LOG.debug("Reassigning owners for {}", entities);
301         for (final DOMEntity entity : entities) {
302             if (predicate.test(entity, oldOwner)) {
303
304                 if (!isActiveCandidate(oldOwner) && isCandidateFor(entity, oldOwner) && hasSingleCandidate(entity)) {
305                     // only skip new owner assignment, only if unreachable, still is a candidate and is the ONLY
306                     // candidate
307                     LOG.debug("{} is the only candidate for {}. Skipping reassignment.", oldOwner, entity);
308                     continue;
309                 }
310                 ownerToEntity.remove(oldOwner, entity);
311                 assignOwnerFor(entity);
312             }
313         }
314     }
315
316     private boolean isActiveCandidate(final String candidate) {
317         return activeMembers.contains(candidate);
318     }
319
320     private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
321         return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
322     }
323
324     private boolean hasSingleCandidate(final DOMEntity entity) {
325         return currentCandidates.getOrDefault(entity, Set.of()).size() == 1;
326     }
327
328     private void assignOwnerFor(final DOMEntity entity) {
329         final Set<String> candidatesForEntity = currentCandidates.get(entity);
330         if (candidatesForEntity.isEmpty()) {
331             LOG.debug("No candidates present for entity: {}", entity);
332             removeOwner(entity);
333             return;
334         }
335
336         String pickedCandidate = null;
337         for (final String candidate : candidatesForEntity) {
338             if (activeMembers.contains(candidate)) {
339                 pickedCandidate = candidate;
340                 break;
341             }
342         }
343         if (pickedCandidate == null) {
344             LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
345                     entity, activeMembers, currentCandidates.get(entity));
346             // no candidate is reachable so only remove owner if necessary
347             removeOwner(entity);
348             return;
349         }
350         ownerToEntity.put(pickedCandidate, entity);
351
352         LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
353         currentOwners.put(entity, pickedCandidate);
354         writeNewOwner(entity, pickedCandidate);
355     }
356
357     private void removeOwner(final DOMEntity entity) {
358         if (currentOwners.containsKey(entity)) {
359             // assign empty owner to dd, as we cannot delete data for a key since that would prevent
360             // writes for the same key
361             currentOwners.remove(entity);
362
363             writeNewOwner(entity, "");
364         }
365     }
366
367     private void writeNewOwner(final DOMEntity entity, final String candidate) {
368         ownerReplicator.askUpdate(
369                 askReplyTo -> new Replicator.Update<>(
370                         new LWWRegisterKey<>(entity.toString()),
371                         new LWWRegister<>(node.uniqueAddress(), candidate, 0),
372                         Replicator.writeLocal(),
373                         askReplyTo,
374                         register -> register.withValue(node, candidate, clock)),
375                 OwnerChanged::new);
376     }
377
378     private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
379         LOG.debug("Received MemberUp : {}", event);
380
381         handleReachableEvent(event.getRoles());
382         return this;
383     }
384
385     private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
386         LOG.debug("Received MemberReachable : {}", event);
387
388         handleReachableEvent(event.getRoles());
389         return this;
390     }
391
392     private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesBackendRequest request) {
393         request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates)));
394         return this;
395     }
396
397     private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityBackendRequest request) {
398         final DOMEntity entity = extractEntity(request);
399         request.getReplyTo().tell(StatusReply.success(
400                 new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity))));
401         return this;
402     }
403
404     private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerBackendRequest request) {
405         request.getReplyTo().tell(
406                 StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request)))));
407         return this;
408     }
409
410     private void handleReachableEvent(final Set<String> roles) {
411         if (roles.contains(dataCenter)) {
412             activeMembers.add(extractRole(roles));
413             assignMissingOwners();
414         } else {
415             LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
416         }
417     }
418
419     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
420         LOG.debug("Received MemberDown : {}", event);
421
422         handleUnreachableEvent(event.getRoles());
423         return this;
424     }
425
426     private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
427         LOG.debug("Received MemberUnreachable : {}", event);
428
429         handleUnreachableEvent(event.getRoles());
430         return this;
431     }
432
433     private void handleUnreachableEvent(final Set<String> roles) {
434         if (roles.contains(dataCenter)) {
435             activeMembers.remove(extractRole(roles));
436             reassignUnreachableOwners();
437         } else {
438             LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
439         }
440     }
441
442     private Set<String> getActiveMembers() {
443         final CurrentClusterState clusterState = cluster.state();
444         final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
445             .map(OwnerSupervisor::extractRole)
446             .collect(Collectors.toSet());
447
448         return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
449             // We are evaluating the set of roles for each member
450             .map(Member::getRoles)
451             // Filter out any members which do not share our dataCenter
452             .filter(roles -> roles.contains(dataCenter))
453             // Find first legal role
454             .map(OwnerSupervisor::extractRole)
455             // filter out unreachable roles
456             .filter(role -> !unreachableRoles.contains(role))
457             .collect(Collectors.toSet());
458     }
459
460     private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
461         final var name = request.getName();
462         final var iid = name.getInstanceIdentifier();
463         if (iid != null) {
464             return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
465         }
466         final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
467         return new DOMEntity(request.getType().getValue(), str);
468     }
469
470     private static String extractRole(final Member member) {
471         return extractRole(member.getRoles());
472     }
473
474     private static String extractRole(final Set<String> roles) {
475         return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
476                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
477     }
478
479     private static String extractDatacenterRole(final Member member) {
480         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
481                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
482     }
483
484     @Override
485     Logger getLogger() {
486         return LOG;
487     }
488 }