Make Payload Serializable
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / CandidateCleaner.java
1 /*
2  * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORSet;
18 import akka.cluster.ddata.SelfUniqueAddress;
19 import akka.cluster.ddata.typed.javadsl.DistributedData;
20 import akka.cluster.ddata.typed.javadsl.Replicator;
21 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
22 import java.time.Duration;
23 import java.util.Map;
24 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
25 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
26 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesUpdateResponse;
27 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
28 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
29 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Actor that can be spawned by all the supervisor implementations that executes clearing of candidates once
35  * candidate retrieval succeeds. Once candidates for the member are cleared(or immediately if none need to be cleared),
36  * the actor stops itself.
37  */
38 public final class CandidateCleaner extends AbstractBehavior<OwnerSupervisorCommand> {
39     private static final Logger LOG = LoggerFactory.getLogger(CandidateCleaner.class);
40
41     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
42     private final SelfUniqueAddress node;
43
44     private int remaining = 0;
45
46     private CandidateCleaner(final ActorContext<OwnerSupervisorCommand> context) {
47         super(context);
48
49         final ActorRef<Replicator.Command> replicator = DistributedData.get(getContext().getSystem()).replicator();
50         candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, Duration.ofSeconds(5));
51         node = DistributedData.get(context.getSystem()).selfUniqueAddress();
52
53     }
54
55     public static Behavior<OwnerSupervisorCommand> create() {
56         return Behaviors.setup(CandidateCleaner::new);
57     }
58
59     @Override
60     public Receive<OwnerSupervisorCommand> createReceive() {
61         return newReceiveBuilder()
62                 .onMessage(ClearCandidates.class, this::onClearCandidates)
63                 .onMessage(ClearCandidatesUpdateResponse.class, this::onClearCandidatesUpdateResponse)
64                 .build();
65     }
66
67     private Behavior<OwnerSupervisorCommand> onClearCandidates(final ClearCandidates command) {
68         LOG.debug("Clearing candidates for member: {}", command.getOriginalMessage().getCandidate());
69
70         final ORMap<DOMEntity, ORSet<String>> candidates =
71                 ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) command.getResponse())
72                         .get(CandidateRegistry.KEY);
73
74         for (final Map.Entry<DOMEntity, ORSet<String>> entry : candidates.getEntries().entrySet()) {
75             if (entry.getValue().contains(command.getOriginalMessage().getCandidate())) {
76                 LOG.debug("Removing {} from {}", command.getOriginalMessage().getCandidate(), entry.getKey());
77
78                 remaining++;
79                 candidateReplicator.askUpdate(
80                         askReplyTo -> new Replicator.Update<>(
81                                 CandidateRegistry.KEY,
82                                 ORMap.empty(),
83                                 new Replicator.WriteMajority(Duration.ofSeconds(10)),
84                                 askReplyTo,
85                                 map -> map.update(node, entry.getKey(), ORSet.empty(),
86                                         value -> value.remove(node, command.getOriginalMessage().getCandidate()))),
87                         updateResponse -> new ClearCandidatesUpdateResponse(updateResponse,
88                                 command.getOriginalMessage().getReplyTo()));
89             }
90         }
91
92         if (remaining == 0) {
93             LOG.debug("Did not clear any candidates for {}", command.getOriginalMessage().getCandidate());
94             command.getOriginalMessage().getReplyTo().tell(new ClearCandidatesResponse());
95             return Behaviors.stopped();
96         }
97         return this;
98     }
99
100     private Behavior<OwnerSupervisorCommand> onClearCandidatesUpdateResponse(
101             final ClearCandidatesUpdateResponse command) {
102         remaining--;
103         if (remaining == 0) {
104             LOG.debug("Last update response for candidate removal received, replying to: {}", command.getReplyTo());
105             command.getReplyTo().tell(new ClearCandidatesResponse());
106             return Behaviors.stopped();
107         } else {
108             LOG.debug("Have still {} outstanding requests after {}", remaining, command.getResponse());
109         }
110         return this;
111     }
112 }