2 * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORSet;
18 import akka.cluster.ddata.SelfUniqueAddress;
19 import akka.cluster.ddata.typed.javadsl.DistributedData;
20 import akka.cluster.ddata.typed.javadsl.Replicator;
21 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
22 import java.time.Duration;
24 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
25 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
26 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesUpdateResponse;
27 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
28 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
29 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Actor that can be spawned by all the supervisor implementations that executes clearing of candidates once
35 * candidate retrieval succeeds. Once candidates for the member are cleared(or immediately if none need to be cleared),
36 * the actor stops itself.
38 public final class CandidateCleaner extends AbstractBehavior<OwnerSupervisorCommand> {
39 private static final Logger LOG = LoggerFactory.getLogger(CandidateCleaner.class);
41 private final ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
42 private final SelfUniqueAddress node;
44 private int remaining = 0;
46 private CandidateCleaner(final ActorContext<OwnerSupervisorCommand> context) {
49 final ActorRef<Replicator.Command> replicator = DistributedData.get(getContext().getSystem()).replicator();
50 candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, Duration.ofSeconds(5));
51 node = DistributedData.get(context.getSystem()).selfUniqueAddress();
55 public static Behavior<OwnerSupervisorCommand> create() {
56 return Behaviors.setup(CandidateCleaner::new);
60 public Receive<OwnerSupervisorCommand> createReceive() {
61 return newReceiveBuilder()
62 .onMessage(ClearCandidates.class, this::onClearCandidates)
63 .onMessage(ClearCandidatesUpdateResponse.class, this::onClearCandidatesUpdateResponse)
67 private Behavior<OwnerSupervisorCommand> onClearCandidates(final ClearCandidates command) {
68 LOG.debug("Clearing candidates for member: {}", command.getOriginalMessage().getCandidate());
70 final ORMap<DOMEntity, ORSet<String>> candidates =
71 ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) command.getResponse())
72 .get(CandidateRegistry.KEY);
74 for (final Map.Entry<DOMEntity, ORSet<String>> entry : candidates.getEntries().entrySet()) {
75 if (entry.getValue().contains(command.getOriginalMessage().getCandidate())) {
76 LOG.debug("Removing {} from {}", command.getOriginalMessage().getCandidate(), entry.getKey());
79 candidateReplicator.askUpdate(
80 askReplyTo -> new Replicator.Update<>(
81 CandidateRegistry.KEY,
83 new Replicator.WriteMajority(Duration.ofSeconds(10)),
85 map -> map.update(node, entry.getKey(), ORSet.empty(),
86 value -> value.remove(node, command.getOriginalMessage().getCandidate()))),
87 updateResponse -> new ClearCandidatesUpdateResponse(updateResponse,
88 command.getOriginalMessage().getReplyTo()));
93 LOG.debug("Did not clear any candidates for {}", command.getOriginalMessage().getCandidate());
94 command.getOriginalMessage().getReplyTo().tell(new ClearCandidatesResponse());
95 return Behaviors.stopped();
100 private Behavior<OwnerSupervisorCommand> onClearCandidatesUpdateResponse(
101 final ClearCandidatesUpdateResponse command) {
103 if (remaining == 0) {
104 LOG.debug("Last update response for candidate removal received, replying to: {}", command.getReplyTo());
105 command.getReplyTo().tell(new ClearCandidatesResponse());
106 return Behaviors.stopped();
108 LOG.debug("Have still {} outstanding requests after {}", remaining, command.getResponse());