2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.registry.candidate;
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.actor.typed.javadsl.StashBuffer;
17 import akka.cluster.Cluster;
18 import java.time.Duration;
20 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
21 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
22 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
23 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
24 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFailed;
25 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFinished;
26 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
27 import org.opendaylight.controller.eos.akka.registry.candidate.command.RemovePreviousCandidates;
28 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 public class CandidateRegistryInit extends AbstractBehavior<CandidateRegistryCommand> {
34 private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistryInit.class);
36 private static final String DATACENTER_PREFIX = "dc-";
38 private final StashBuffer<CandidateRegistryCommand> stash;
39 private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
40 private final String selfRole;
42 public CandidateRegistryInit(final ActorContext<CandidateRegistryCommand> ctx,
43 final StashBuffer<CandidateRegistryCommand> stash,
44 final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
47 this.ownerSupervisor = ownerSupervisor;
48 this.selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
50 ctx.getSelf().tell(new RemovePreviousCandidates());
52 LOG.debug("{} : CandidateRegistry syncing behavior started.", selfRole);
55 public static Behavior<CandidateRegistryCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
56 return Behaviors.withStash(100,
58 Behaviors.setup(ctx -> new CandidateRegistryInit(ctx, stash, ownerSupervisor)));
62 public Receive<CandidateRegistryCommand> createReceive() {
63 return newReceiveBuilder()
64 .onMessage(RemovePreviousCandidates.class, this::onRemoveCandidates)
65 .onMessage(CandidateRemovalFinished.class, command -> switchToCandidateRegistry())
66 .onMessage(CandidateRemovalFailed.class, this::candidateRemovalFailed)
67 .onMessage(RegisterCandidate.class, this::stashCommand)
68 .onMessage(UnregisterCandidate.class, this::stashCommand)
72 private Behavior<CandidateRegistryCommand> candidateRemovalFailed(final CandidateRemovalFailed command) {
73 LOG.warn("{} : Initial removal of candidates from previous iteration failed. Rescheduling.", selfRole,
74 command.getThrowable());
75 getContext().getSelf().tell(new RemovePreviousCandidates());
79 private Behavior<CandidateRegistryCommand> onRemoveCandidates(final RemovePreviousCandidates command) {
80 LOG.debug("Sending RemovePreviousCandidates.");
81 getContext().ask(ClearCandidatesResponse.class,
82 ownerSupervisor, Duration.ofSeconds(5),
83 ref -> new ClearCandidatesForMember(ref, selfRole),
84 (response, throwable) -> {
85 if (response != null) {
86 return new CandidateRemovalFinished();
88 return new CandidateRemovalFailed(throwable);
95 private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
96 LOG.debug("Stashing {}", command);
101 private Behavior<CandidateRegistryCommand> switchToCandidateRegistry() {
102 LOG.debug("{} : Clearing of candidates from previous instance done, switching to CandidateRegistry.", selfRole);
103 return stash.unstashAll(CandidateRegistry.create());
106 private static String extractRole(final Set<String> roles) {
107 return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
108 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));