16c2ab625830d80f70560498925862816461b4a7
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / candidate / CandidateRegistry.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.registry.candidate;
9
10 import akka.actor.typed.Behavior;
11 import akka.actor.typed.javadsl.AbstractBehavior;
12 import akka.actor.typed.javadsl.ActorContext;
13 import akka.actor.typed.javadsl.Behaviors;
14 import akka.actor.typed.javadsl.Receive;
15 import akka.cluster.ddata.Key;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORMapKey;
18 import akka.cluster.ddata.ORSet;
19 import akka.cluster.ddata.SelfUniqueAddress;
20 import akka.cluster.ddata.typed.javadsl.DistributedData;
21 import akka.cluster.ddata.typed.javadsl.Replicator;
22 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
23 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
24 import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
25 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
26 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
27 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Actor responsible for handling registrations of candidates into distributed-data.
33  */
34 public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryCommand> {
35
36     private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
37
38     public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
39
40     private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
41     private final SelfUniqueAddress node;
42
43     private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
44                               final ReplicatorMessageAdapter<CandidateRegistryCommand,
45                                       ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) {
46         super(context);
47         this.replicatorAdapter = replicatorAdapter;
48
49         this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
50
51         LOG.debug("Candidate registry started");
52     }
53
54     public static Behavior<CandidateRegistryCommand> create() {
55         return Behaviors.setup(ctx ->
56                 DistributedData.withReplicatorMessageAdapter(
57                         (ReplicatorMessageAdapter<CandidateRegistryCommand,
58                                 ORMap<DOMEntity,ORSet<String>>> replicatorAdapter) ->
59                                         new CandidateRegistry(ctx, replicatorAdapter)));
60     }
61
62     @Override
63     public Receive<CandidateRegistryCommand> createReceive() {
64         return newReceiveBuilder()
65                 .onMessage(RegisterCandidate.class, this::onRegisterCandidate)
66                 .onMessage(UnregisterCandidate.class, this::onUnregisterCandidate)
67                 .onMessage(InternalUpdateResponse.class, this::onInternalUpdateResponse)
68                 .build();
69     }
70
71     private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
72         LOG.debug("Registering candidate({}) for entity: {}",
73                 registerCandidate.getCandidate(), registerCandidate.getEntity());
74         replicatorAdapter.askUpdate(
75                 askReplyTo -> new Replicator.Update<>(
76                         KEY,
77                         ORMap.empty(),
78                         Replicator.writeLocal(),
79                         askReplyTo,
80                         map -> map.update(node, registerCandidate.getEntity(), ORSet.empty(),
81                                 value -> value.add(node, registerCandidate.getCandidate()))),
82                 InternalUpdateResponse::new);
83         return this;
84     }
85
86     private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
87         LOG.debug("Removing candidate({}) from entity: {}",
88                 unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
89         replicatorAdapter.askUpdate(
90                 askReplyTo -> new Replicator.Update<>(
91                         KEY,
92                         ORMap.empty(),
93                         Replicator.writeLocal(),
94                         askReplyTo,
95                         map -> map.update(node, unregisterCandidate.getEntity(), ORSet.empty(),
96                                 value -> value.remove(node, unregisterCandidate.getCandidate()))),
97                 InternalUpdateResponse::new);
98         return this;
99     }
100
101     private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
102         LOG.debug("Received update response: {}", updateResponse.getRsp());
103         return this;
104     }
105 }