Fix clearing of candidates from previous iterations
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / candidate / CandidateRegistryInit.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.registry.candidate;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.actor.typed.javadsl.StashBuffer;
17 import akka.cluster.Cluster;
18 import java.time.Duration;
19 import java.util.Set;
20 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
21 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
22 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
23 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
24 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFailed;
25 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFinished;
26 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
27 import org.opendaylight.controller.eos.akka.registry.candidate.command.RemovePreviousCandidates;
28 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public class CandidateRegistryInit extends AbstractBehavior<CandidateRegistryCommand> {
33
34     private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistryInit.class);
35
36     private static final String DATACENTER_PREFIX = "dc-";
37
38     private final StashBuffer<CandidateRegistryCommand> stash;
39     private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
40     private final String selfRole;
41
42     public CandidateRegistryInit(final ActorContext<CandidateRegistryCommand> ctx,
43                                  final StashBuffer<CandidateRegistryCommand> stash,
44                                  final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
45         super(ctx);
46         this.stash = stash;
47         this.ownerSupervisor = ownerSupervisor;
48         this.selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
49
50         ctx.getSelf().tell(new RemovePreviousCandidates());
51
52         LOG.debug("{} : CandidateRegistry syncing behavior started.", selfRole);
53     }
54
55     public static Behavior<CandidateRegistryCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
56         return Behaviors.withStash(100,
57                 stash ->
58                         Behaviors.setup(ctx -> new CandidateRegistryInit(ctx, stash, ownerSupervisor)));
59     }
60
61     @Override
62     public Receive<CandidateRegistryCommand> createReceive() {
63         return newReceiveBuilder()
64                 .onMessage(RemovePreviousCandidates.class, this::onRemoveCandidates)
65                 .onMessage(CandidateRemovalFinished.class, command -> switchToCandidateRegistry())
66                 .onMessage(CandidateRemovalFailed.class, this::candidateRemovalFailed)
67                 .onMessage(RegisterCandidate.class, this::stashCommand)
68                 .onMessage(UnregisterCandidate.class, this::stashCommand)
69                 .build();
70     }
71
72     private Behavior<CandidateRegistryCommand> candidateRemovalFailed(final CandidateRemovalFailed command) {
73         LOG.warn("{} : Initial removal of candidates from previous iteration failed. Rescheduling.", selfRole,
74                 command.getThrowable());
75         getContext().getSelf().tell(new RemovePreviousCandidates());
76         return this;
77     }
78
79     private Behavior<CandidateRegistryCommand> onRemoveCandidates(final RemovePreviousCandidates command) {
80         LOG.debug("Sending RemovePreviousCandidates.");
81         getContext().ask(ClearCandidatesResponse.class,
82                 ownerSupervisor, Duration.ofSeconds(5),
83                 ref -> new ClearCandidatesForMember(ref, selfRole),
84                 (response, throwable) -> {
85                     if (response != null) {
86                         return new CandidateRemovalFinished();
87                     } else {
88                         return new CandidateRemovalFailed(throwable);
89                     }
90                 });
91
92         return this;
93     }
94
95     private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
96         LOG.debug("Stashing {}", command);
97         stash.stash(command);
98         return this;
99     }
100
101     private Behavior<CandidateRegistryCommand> switchToCandidateRegistry() {
102         LOG.debug("{} : Clearing of candidates from previous instance done, switching to CandidateRegistry.", selfRole);
103         return stash.unstashAll(CandidateRegistry.create());
104     }
105
106     private static String extractRole(final Set<String> roles) {
107         return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
108                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
109     }
110 }