2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.Member;
18 import akka.cluster.ddata.LWWRegister;
19 import akka.cluster.ddata.LWWRegisterKey;
20 import akka.cluster.ddata.ORMap;
21 import akka.cluster.ddata.ORSet;
22 import akka.cluster.ddata.SelfUniqueAddress;
23 import akka.cluster.ddata.typed.javadsl.DistributedData;
24 import akka.cluster.ddata.typed.javadsl.Replicator;
25 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
26 import akka.cluster.typed.Cluster;
27 import akka.cluster.typed.Subscribe;
28 import com.google.common.collect.HashMultimap;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableSet;
31 import com.google.common.collect.Multimap;
32 import com.google.common.collect.Sets;
33 import java.time.Duration;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.List;
41 import java.util.stream.Collectors;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
50 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
51 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.collection.JavaConverters;
57 * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
58 * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
59 * On cluster up/down etc. events the owners are reassigned if possible.
61 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
63 private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
64 private static final String DATACENTER_PREFIX = "dc-";
66 private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
68 // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
69 // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
70 // running in a cluster-singleton
71 private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
73 private final Cluster cluster;
74 private final SelfUniqueAddress node;
75 private final String dataCenter;
77 private final Set<String> activeMembers;
79 // currently registered candidates
80 private final Map<DOMEntity, Set<String>> currentCandidates;
82 private final Map<DOMEntity, String> currentOwners;
83 // reverse lookup of owner to entity
84 private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
86 private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
87 final Map<DOMEntity, Set<String>> currentCandidates,
88 final Map<DOMEntity, String> currentOwners) {
91 final DistributedData distributedData = DistributedData.get(context.getSystem());
92 final ActorRef<Replicator.Command> replicator = distributedData.replicator();
94 cluster = Cluster.get(context.getSystem());
95 ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
96 dataCenter = extractDatacenterRole(cluster.selfMember());
98 node = distributedData.selfUniqueAddress();
99 activeMembers = getActiveMembers();
101 this.currentCandidates = currentCandidates;
102 this.currentOwners = currentOwners;
104 for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
105 ownerToEntity.put(entry.getValue(), entry.getKey());
108 // check whether we have any unreachable/missing owners
109 reassignUnreachableOwners();
110 assignMissingOwners();
112 final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
113 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
114 if (event instanceof ClusterEvent.MemberUp) {
115 return new MemberUpEvent(event.member().address(), event.member().getRoles());
117 return new MemberDownEvent(event.member().address(), event.member().getRoles());
120 cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
122 final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
123 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
124 if (event instanceof ClusterEvent.ReachableMember) {
125 return new MemberReachableEvent(event.member().address(), event.member().getRoles());
127 return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
130 cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
132 new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
133 Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
135 LOG.debug("Owner Supervisor started");
138 public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
139 final Map<DOMEntity, String> currentOwners) {
140 return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
144 public Receive<OwnerSupervisorCommand> createReceive() {
145 return newReceiveBuilder()
146 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
147 .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
148 .onMessage(MemberUpEvent.class, this::onPeerUp)
149 .onMessage(MemberDownEvent.class, this::onPeerDown)
150 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
151 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
155 private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
156 LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
157 return IdleSupervisor.create();
160 private void reassignUnreachableOwners() {
161 final Set<String> ownersToReassign = new HashSet<>();
162 for (final String owner : ownerToEntity.keys()) {
163 if (!activeMembers.contains(owner)) {
164 ownersToReassign.add(owner);
168 for (final String owner : ownersToReassign) {
169 reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)));
173 private void assignMissingOwners() {
174 for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
175 if (!currentOwners.containsKey(entry.getKey())) {
176 assignOwnerFor(entry.getKey());
181 private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
182 LOG.debug("onCandidatesChanged {}", message.getResponse());
183 if (message.getResponse() instanceof Replicator.Changed) {
184 final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
185 (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
186 processCandidateChanges(changed.get(CandidateRegistry.KEY));
191 private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
192 final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
193 for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
194 processCandidatesFor(entry.getKey(), entry.getValue());
198 private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
199 LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
201 final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
202 // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
204 if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
205 LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
206 currentCandidates.put(entity, new HashSet<>(candidates));
208 LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
209 assignOwnerFor(entity);
214 final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
215 final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
217 LOG.debug("currently present candidates: {}", currentlyPresent);
218 LOG.debug("difference: {}", difference);
220 final List<String> ownersToReassign = new ArrayList<>();
222 // first add/remove candidates from entities
223 for (final String toCheck : difference) {
224 if (!currentlyPresent.contains(toCheck)) {
226 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
227 currentCandidates.get(entity).add(toCheck);
229 if (!currentOwners.containsKey(entity)) {
230 // might as well assign right away when we don't have an owner
231 assignOwnerFor(entity);
234 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
238 if (!candidates.contains(toCheck)) {
240 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
241 currentCandidates.get(entity).remove(toCheck);
242 if (ownerToEntity.containsKey(toCheck)) {
243 ownersToReassign.add(toCheck);
248 // then reassign those that need new owners
249 for (final String toReassign : ownersToReassign) {
250 reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)));
253 if (currentCandidates.get(entity) == null) {
254 LOG.debug("Last candidate removed for {}", entity);
256 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
260 private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities) {
261 LOG.debug("Reassigning owners for {}", entities);
262 for (final DOMEntity entity : entities) {
264 // only reassign owner for those entities that lost this candidate or is not reachable
265 if (!activeMembers.contains(oldOwner)
266 || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) {
267 ownerToEntity.remove(oldOwner, entity);
268 assignOwnerFor(entity);
273 private void assignOwnerFor(final DOMEntity entity) {
274 final Set<String> candidatesForEntity = currentCandidates.get(entity);
275 if (candidatesForEntity.isEmpty()) {
276 LOG.debug("No candidates present for entity: {}", entity);
281 String pickedCandidate = null;
282 for (final String candidate : candidatesForEntity) {
283 if (activeMembers.contains(candidate)) {
284 pickedCandidate = candidate;
288 if (pickedCandidate == null) {
289 LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
290 entity, activeMembers, currentCandidates.get(entity));
291 // no candidate is reachable so only remove owner if necessary
295 ownerToEntity.put(pickedCandidate, entity);
297 LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
298 currentOwners.put(entity, pickedCandidate);
299 writeNewOwner(entity, pickedCandidate);
302 private void removeOwner(final DOMEntity entity) {
303 if (currentOwners.containsKey(entity)) {
304 // assign empty owner to dd, as we cannot delete data for a key since that would prevent
305 // writes for the same key
306 currentOwners.remove(entity);
308 writeNewOwner(entity, "");
312 private void writeNewOwner(final DOMEntity entity, final String candidate) {
313 ownerReplicator.askUpdate(
314 askReplyTo -> new Replicator.Update<>(
315 new LWWRegisterKey<>(entity.toString()),
316 new LWWRegister<>(node.uniqueAddress(), candidate, 0),
317 Replicator.writeLocal(),
319 register -> register.withValue(node, candidate, clock)),
323 private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
324 LOG.debug("Received MemberUp : {}", event);
326 handleReachableEvent(event.getRoles());
330 private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
331 LOG.debug("Received MemberReachable : {}", event);
333 handleReachableEvent(event.getRoles());
337 private void handleReachableEvent(final Set<String> roles) {
338 if (roles.contains(dataCenter)) {
339 activeMembers.add(extractRole(roles));
340 assignMissingOwners();
342 LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
346 private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
347 LOG.debug("Received MemberDown : {}", event);
349 handleUnreachableEvent(event.getRoles());
353 private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
354 LOG.debug("Received MemberUnreachable : {}", event);
356 handleUnreachableEvent(event.getRoles());
360 private void handleUnreachableEvent(final Set<String> roles) {
361 if (roles.contains(dataCenter)) {
362 activeMembers.remove(extractRole(roles));
363 reassignUnreachableOwners();
365 LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
369 private Set<String> getActiveMembers() {
370 final Set<String> members = new HashSet<>();
371 cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
372 // filter out unreachable
373 members.removeAll(cluster.state().getUnreachable().stream()
374 .map(OwnerSupervisor::extractRole)
375 .collect(Collectors.toSet()));
377 // filter out members not from our datacenter
378 cluster.state().getMembers().forEach(member -> {
379 if (!member.roles().contains(dataCenter)) {
380 members.remove(extractRole(member));
387 private static String extractRole(final Member member) {
388 return extractRole(member.getRoles());
391 private static String extractRole(final Set<String> roles) {
392 return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
393 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
396 private String extractDatacenterRole(final Member member) {
397 return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
398 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));