2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.registry.candidate;
10 import akka.actor.typed.Behavior;
11 import akka.actor.typed.javadsl.AbstractBehavior;
12 import akka.actor.typed.javadsl.ActorContext;
13 import akka.actor.typed.javadsl.Behaviors;
14 import akka.actor.typed.javadsl.Receive;
15 import akka.cluster.Cluster;
16 import akka.cluster.ddata.Key;
17 import akka.cluster.ddata.ORMap;
18 import akka.cluster.ddata.ORMapKey;
19 import akka.cluster.ddata.ORSet;
20 import akka.cluster.ddata.SelfUniqueAddress;
21 import akka.cluster.ddata.typed.javadsl.DistributedData;
22 import akka.cluster.ddata.typed.javadsl.Replicator;
23 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
25 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
26 import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
27 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
28 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
29 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Actor responsible for handling registrations of candidates into distributed-data.
36 public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryCommand> {
38 private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
40 private static final String DATACENTER_PREFIX = "dc-";
42 public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
44 private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
45 private final SelfUniqueAddress node;
46 private final String selfRole;
48 private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
49 final ReplicatorMessageAdapter<CandidateRegistryCommand,
50 ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) {
52 this.replicatorAdapter = replicatorAdapter;
54 this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
55 this.selfRole = extractRole(Cluster.get(context.getSystem()).selfMember().getRoles());
57 LOG.debug("{} : Candidate registry started", selfRole);
60 public static Behavior<CandidateRegistryCommand> create() {
61 return Behaviors.setup(ctx ->
62 DistributedData.withReplicatorMessageAdapter(
63 (ReplicatorMessageAdapter<CandidateRegistryCommand,
64 ORMap<DOMEntity,ORSet<String>>> replicatorAdapter) ->
65 new CandidateRegistry(ctx, replicatorAdapter)));
69 public Receive<CandidateRegistryCommand> createReceive() {
70 return newReceiveBuilder()
71 .onMessage(RegisterCandidate.class, this::onRegisterCandidate)
72 .onMessage(UnregisterCandidate.class, this::onUnregisterCandidate)
73 .onMessage(InternalUpdateResponse.class, this::onInternalUpdateResponse)
77 private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
78 LOG.debug("{} - Registering candidate({}) for entity: {}", selfRole,
79 registerCandidate.getCandidate(), registerCandidate.getEntity());
80 replicatorAdapter.askUpdate(
81 askReplyTo -> new Replicator.Update<>(
84 Replicator.writeLocal(),
86 map -> map.update(node, registerCandidate.getEntity(), ORSet.empty(),
87 value -> value.add(node, registerCandidate.getCandidate()))),
88 InternalUpdateResponse::new);
92 private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
93 LOG.debug("{} - Removing candidate({}) from entity: {}", selfRole,
94 unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
95 replicatorAdapter.askUpdate(
96 askReplyTo -> new Replicator.Update<>(
99 Replicator.writeLocal(),
101 map -> map.update(node, unregisterCandidate.getEntity(), ORSet.empty(),
102 value -> value.remove(node, unregisterCandidate.getCandidate()))),
103 InternalUpdateResponse::new);
107 private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
108 LOG.debug("{} : Received update response: {}", selfRole, updateResponse.getRsp());
112 private static String extractRole(final Set<String> roles) {
113 return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
114 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));