2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.registry.candidate;
10 import akka.actor.typed.Behavior;
11 import akka.actor.typed.javadsl.AbstractBehavior;
12 import akka.actor.typed.javadsl.ActorContext;
13 import akka.actor.typed.javadsl.Behaviors;
14 import akka.actor.typed.javadsl.Receive;
15 import akka.cluster.ddata.Key;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORMapKey;
18 import akka.cluster.ddata.ORSet;
19 import akka.cluster.ddata.SelfUniqueAddress;
20 import akka.cluster.ddata.typed.javadsl.DistributedData;
21 import akka.cluster.ddata.typed.javadsl.Replicator;
22 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
23 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
24 import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
25 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
26 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
27 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 * Actor responsible for handling registrations of candidates into distributed-data.
34 public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryCommand> {
36 private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
38 public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
40 private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
41 private final SelfUniqueAddress node;
43 private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
44 final ReplicatorMessageAdapter<CandidateRegistryCommand,
45 ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) {
47 this.replicatorAdapter = replicatorAdapter;
49 this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
51 LOG.debug("Candidate registry started");
54 public static Behavior<CandidateRegistryCommand> create() {
55 return Behaviors.setup(ctx ->
56 DistributedData.withReplicatorMessageAdapter(
57 (ReplicatorMessageAdapter<CandidateRegistryCommand,
58 ORMap<DOMEntity,ORSet<String>>> replicatorAdapter) ->
59 new CandidateRegistry(ctx, replicatorAdapter)));
63 public Receive<CandidateRegistryCommand> createReceive() {
64 return newReceiveBuilder()
65 .onMessage(RegisterCandidate.class, this::onRegisterCandidate)
66 .onMessage(UnregisterCandidate.class, this::onUnregisterCandidate)
67 .onMessage(InternalUpdateResponse.class, this::onInternalUpdateResponse)
71 private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
72 LOG.debug("Registering candidate({}) for entity: {}",
73 registerCandidate.getCandidate(), registerCandidate.getEntity());
74 replicatorAdapter.askUpdate(
75 askReplyTo -> new Replicator.Update<>(
78 Replicator.writeLocal(),
80 map -> map.update(node, registerCandidate.getEntity(), ORSet.empty(),
81 value -> value.add(node, registerCandidate.getCandidate()))),
82 InternalUpdateResponse::new);
86 private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
87 LOG.debug("Removing candidate({}) from entity: {}",
88 unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
89 replicatorAdapter.askUpdate(
90 askReplyTo -> new Replicator.Update<>(
93 Replicator.writeLocal(),
95 map -> map.update(node, unregisterCandidate.getEntity(), ORSet.empty(),
96 value -> value.remove(node, unregisterCandidate.getCandidate()))),
97 InternalUpdateResponse::new);
101 private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
102 LOG.debug("Received update response: {}", updateResponse.getRsp());