2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.typed.ActorRef;
14 import akka.actor.typed.Behavior;
15 import akka.actor.typed.javadsl.AbstractBehavior;
16 import akka.actor.typed.javadsl.ActorContext;
17 import akka.actor.typed.javadsl.Behaviors;
18 import akka.actor.typed.javadsl.Receive;
19 import akka.cluster.ClusterEvent;
20 import akka.cluster.ClusterEvent.CurrentClusterState;
21 import akka.cluster.Member;
22 import akka.cluster.ddata.LWWRegister;
23 import akka.cluster.ddata.LWWRegisterKey;
24 import akka.cluster.ddata.ORMap;
25 import akka.cluster.ddata.ORSet;
26 import akka.cluster.ddata.SelfUniqueAddress;
27 import akka.cluster.ddata.typed.javadsl.DistributedData;
28 import akka.cluster.ddata.typed.javadsl.Replicator;
29 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
30 import akka.cluster.typed.Cluster;
31 import akka.cluster.typed.Subscribe;
32 import akka.pattern.StatusReply;
33 import com.google.common.collect.HashMultimap;
34 import com.google.common.collect.ImmutableList;
35 import com.google.common.collect.ImmutableSet;
36 import com.google.common.collect.Multimap;
37 import com.google.common.collect.Sets;
38 import java.time.Duration;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.HashSet;
42 import java.util.List;
45 import java.util.function.BiPredicate;
46 import java.util.stream.Collectors;
47 import java.util.stream.StreamSupport;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
51 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
52 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
53 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
54 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
55 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
56 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
57 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
58 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
59 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
60 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
61 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
62 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
63 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
64 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
66 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.collection.JavaConverters;
72 * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
73 * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
74 * On cluster up/down etc. events the owners are reassigned if possible.
76 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
78 private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
79 private static final String DATACENTER_PREFIX = "dc-";
81 private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
83 // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
84 // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
85 // running in a cluster-singleton
86 private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
88 private final Cluster cluster;
89 private final SelfUniqueAddress node;
90 private final String dataCenter;
92 private final Set<String> activeMembers;
94 // currently registered candidates
95 private final Map<DOMEntity, Set<String>> currentCandidates;
97 private final Map<DOMEntity, String> currentOwners;
98 // reverse lookup of owner to entity
99 private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
101 // only reassign owner for those entities that lost this candidate or is not reachable
102 private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
103 !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
105 private final BindingInstanceIdentifierCodec iidCodec;
107 private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
108 final Map<DOMEntity, Set<String>> currentCandidates,
109 final Map<DOMEntity, String> currentOwners,
110 final BindingInstanceIdentifierCodec iidCodec) {
112 this.iidCodec = requireNonNull(iidCodec);
114 final DistributedData distributedData = DistributedData.get(context.getSystem());
115 final ActorRef<Replicator.Command> replicator = distributedData.replicator();
117 cluster = Cluster.get(context.getSystem());
118 ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
119 dataCenter = extractDatacenterRole(cluster.selfMember());
121 node = distributedData.selfUniqueAddress();
122 activeMembers = getActiveMembers();
124 this.currentCandidates = currentCandidates;
125 this.currentOwners = currentOwners;
127 for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
128 ownerToEntity.put(entry.getValue(), entry.getKey());
131 // check whether we have any unreachable/missing owners
132 reassignUnreachableOwners();
133 assignMissingOwners();
135 final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
136 context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
137 if (event instanceof ClusterEvent.MemberUp) {
138 return new MemberUpEvent(event.member().address(), event.member().getRoles());
140 return new MemberDownEvent(event.member().address(), event.member().getRoles());
143 cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
145 final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
146 context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
147 if (event instanceof ClusterEvent.ReachableMember) {
148 return new MemberReachableEvent(event.member().address(), event.member().getRoles());
150 return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
153 cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
155 new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
156 Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
158 LOG.debug("Owner Supervisor started");
161 public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
162 final Map<DOMEntity, String> currentOwners, final BindingInstanceIdentifierCodec iidCodec) {
163 return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners, iidCodec));
167 public Receive<OwnerSupervisorCommand> createReceive() {
168 return newReceiveBuilder()
169 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
170 .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
171 .onMessage(OwnerChanged.class, this::onOwnerChanged)
172 .onMessage(MemberUpEvent.class, this::onPeerUp)
173 .onMessage(MemberDownEvent.class, this::onPeerDown)
174 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
175 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
176 .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
177 .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
178 .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
182 private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
183 LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
184 command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
185 return IdleSupervisor.create(iidCodec);
188 private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
189 LOG.debug("Owner has changed for {}", command.getResponse().key());
193 private void reassignUnreachableOwners() {
194 final Set<String> ownersToReassign = new HashSet<>();
195 for (final String owner : ownerToEntity.keys()) {
196 if (!activeMembers.contains(owner)) {
197 ownersToReassign.add(owner);
201 for (final String owner : ownersToReassign) {
202 reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
206 private void assignMissingOwners() {
207 for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
208 if (!currentOwners.containsKey(entry.getKey())) {
209 assignOwnerFor(entry.getKey());
214 private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
215 LOG.debug("onCandidatesChanged {}", message.getResponse());
216 if (message.getResponse() instanceof Replicator.Changed) {
217 final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
218 (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
219 processCandidateChanges(changed.get(CandidateRegistry.KEY));
224 private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
225 final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
226 for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
227 processCandidatesFor(entry.getKey(), entry.getValue());
231 private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
232 LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
234 final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
235 // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
237 if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
238 LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
239 currentCandidates.put(entity, new HashSet<>(candidates));
241 LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
242 assignOwnerFor(entity);
247 final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
248 final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
250 LOG.debug("currently present candidates: {}", currentlyPresent);
251 LOG.debug("difference: {}", difference);
253 final List<String> ownersToReassign = new ArrayList<>();
255 // first add/remove candidates from entities
256 for (final String toCheck : difference) {
257 if (!currentlyPresent.contains(toCheck)) {
259 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
260 currentCandidates.get(entity).add(toCheck);
262 if (!currentOwners.containsKey(entity)) {
263 // might as well assign right away when we don't have an owner
264 assignOwnerFor(entity);
267 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
271 if (!candidates.contains(toCheck)) {
273 LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
274 currentCandidates.get(entity).remove(toCheck);
275 if (ownerToEntity.containsKey(toCheck)) {
276 ownersToReassign.add(toCheck);
281 // then reassign those that need new owners
282 for (final String toReassign : ownersToReassign) {
283 reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
287 if (currentCandidates.get(entity) == null) {
288 LOG.debug("Last candidate removed for {}", entity);
290 LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
294 private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
295 final BiPredicate<DOMEntity, String> predicate) {
296 LOG.debug("Reassigning owners for {}", entities);
297 for (final DOMEntity entity : entities) {
298 if (predicate.test(entity, oldOwner)) {
299 ownerToEntity.remove(oldOwner, entity);
300 assignOwnerFor(entity);
305 private boolean isActiveCandidate(final String candidate) {
306 return activeMembers.contains(candidate);
309 private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
310 return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
313 private void assignOwnerFor(final DOMEntity entity) {
314 final Set<String> candidatesForEntity = currentCandidates.get(entity);
315 if (candidatesForEntity.isEmpty()) {
316 LOG.debug("No candidates present for entity: {}", entity);
321 String pickedCandidate = null;
322 for (final String candidate : candidatesForEntity) {
323 if (activeMembers.contains(candidate)) {
324 pickedCandidate = candidate;
328 if (pickedCandidate == null) {
329 LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
330 entity, activeMembers, currentCandidates.get(entity));
331 // no candidate is reachable so only remove owner if necessary
335 ownerToEntity.put(pickedCandidate, entity);
337 LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
338 currentOwners.put(entity, pickedCandidate);
339 writeNewOwner(entity, pickedCandidate);
342 private void removeOwner(final DOMEntity entity) {
343 if (currentOwners.containsKey(entity)) {
344 // assign empty owner to dd, as we cannot delete data for a key since that would prevent
345 // writes for the same key
346 currentOwners.remove(entity);
348 writeNewOwner(entity, "");
352 private void writeNewOwner(final DOMEntity entity, final String candidate) {
353 ownerReplicator.askUpdate(
354 askReplyTo -> new Replicator.Update<>(
355 new LWWRegisterKey<>(entity.toString()),
356 new LWWRegister<>(node.uniqueAddress(), candidate, 0),
357 Replicator.writeLocal(),
359 register -> register.withValue(node, candidate, clock)),
363 private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
364 LOG.debug("Received MemberUp : {}", event);
366 handleReachableEvent(event.getRoles());
370 private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
371 LOG.debug("Received MemberReachable : {}", event);
373 handleReachableEvent(event.getRoles());
377 private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesBackendRequest request) {
378 request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates)));
382 private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityBackendRequest request) {
383 final DOMEntity entity = extractEntity(request);
384 request.getReplyTo().tell(StatusReply.success(
385 new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity))));
389 private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerBackendRequest request) {
390 request.getReplyTo().tell(
391 StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request)))));
395 private void handleReachableEvent(final Set<String> roles) {
396 if (roles.contains(dataCenter)) {
397 activeMembers.add(extractRole(roles));
398 assignMissingOwners();
400 LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
404 private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
405 LOG.debug("Received MemberDown : {}", event);
407 handleUnreachableEvent(event.getRoles());
411 private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
412 LOG.debug("Received MemberUnreachable : {}", event);
414 handleUnreachableEvent(event.getRoles());
418 private void handleUnreachableEvent(final Set<String> roles) {
419 if (roles.contains(dataCenter)) {
420 activeMembers.remove(extractRole(roles));
421 reassignUnreachableOwners();
423 LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
427 private Set<String> getActiveMembers() {
428 final CurrentClusterState clusterState = cluster.state();
429 final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
430 .map(OwnerSupervisor::extractRole)
431 .collect(Collectors.toSet());
433 return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
434 // We are evaluating the set of roles for each member
435 .map(Member::getRoles)
436 // Filter out any members which do not share our dataCenter
437 .filter(roles -> roles.contains(dataCenter))
438 // Find first legal role
439 .map(OwnerSupervisor::extractRole)
440 // filter out unreachable roles
441 .filter(role -> !unreachableRoles.contains(role))
442 .collect(Collectors.toSet());
445 private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
446 final var name = request.getName();
447 final var iid = name.getInstanceIdentifier();
449 return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
451 final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
452 return new DOMEntity(request.getType().getValue(), str);
455 private static String extractRole(final Member member) {
456 return extractRole(member.getRoles());
459 private static String extractRole(final Set<String> roles) {
460 return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
461 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
464 private static String extractDatacenterRole(final Member member) {
465 return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
466 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));