Fixup checkstyle
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / candidate / CandidateRegistry.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.registry.candidate;
9
10 import akka.actor.typed.Behavior;
11 import akka.actor.typed.javadsl.AbstractBehavior;
12 import akka.actor.typed.javadsl.ActorContext;
13 import akka.actor.typed.javadsl.Behaviors;
14 import akka.actor.typed.javadsl.Receive;
15 import akka.cluster.Cluster;
16 import akka.cluster.ddata.Key;
17 import akka.cluster.ddata.ORMap;
18 import akka.cluster.ddata.ORMapKey;
19 import akka.cluster.ddata.ORSet;
20 import akka.cluster.ddata.SelfUniqueAddress;
21 import akka.cluster.ddata.typed.javadsl.DistributedData;
22 import akka.cluster.ddata.typed.javadsl.Replicator;
23 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
24 import java.util.Set;
25 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
26 import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
27 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
28 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
29 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Actor responsible for handling registrations of candidates into distributed-data.
35  */
36 public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryCommand> {
37
38     private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
39
40     private static final String DATACENTER_PREFIX = "dc-";
41
42     public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
43
44     private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
45     private final SelfUniqueAddress node;
46     private final String selfRole;
47
48     private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
49                               final ReplicatorMessageAdapter<CandidateRegistryCommand,
50                                       ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) {
51         super(context);
52         this.replicatorAdapter = replicatorAdapter;
53
54         this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
55         this.selfRole = extractRole(Cluster.get(context.getSystem()).selfMember().getRoles());
56
57         LOG.debug("{} : Candidate registry started", selfRole);
58     }
59
60     public static Behavior<CandidateRegistryCommand> create() {
61         return Behaviors.setup(ctx ->
62                 DistributedData.withReplicatorMessageAdapter(
63                         (ReplicatorMessageAdapter<CandidateRegistryCommand,
64                                 ORMap<DOMEntity,ORSet<String>>> replicatorAdapter) ->
65                                         new CandidateRegistry(ctx, replicatorAdapter)));
66     }
67
68     @Override
69     public Receive<CandidateRegistryCommand> createReceive() {
70         return newReceiveBuilder()
71                 .onMessage(RegisterCandidate.class, this::onRegisterCandidate)
72                 .onMessage(UnregisterCandidate.class, this::onUnregisterCandidate)
73                 .onMessage(InternalUpdateResponse.class, this::onInternalUpdateResponse)
74                 .build();
75     }
76
77     private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
78         LOG.debug("{} - Registering candidate({}) for entity: {}", selfRole,
79                 registerCandidate.getCandidate(), registerCandidate.getEntity());
80         replicatorAdapter.askUpdate(
81                 askReplyTo -> new Replicator.Update<>(
82                         KEY,
83                         ORMap.empty(),
84                         Replicator.writeLocal(),
85                         askReplyTo,
86                         map -> map.update(node, registerCandidate.getEntity(), ORSet.empty(),
87                                 value -> value.add(node, registerCandidate.getCandidate()))),
88                 InternalUpdateResponse::new);
89         return this;
90     }
91
92     private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
93         LOG.debug("{} - Removing candidate({}) from entity: {}", selfRole,
94                 unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
95         replicatorAdapter.askUpdate(
96                 askReplyTo -> new Replicator.Update<>(
97                         KEY,
98                         ORMap.empty(),
99                         Replicator.writeLocal(),
100                         askReplyTo,
101                         map -> map.update(node, unregisterCandidate.getEntity(), ORSet.empty(),
102                                 value -> value.remove(node, unregisterCandidate.getCandidate()))),
103                 InternalUpdateResponse::new);
104         return this;
105     }
106
107     private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
108         LOG.debug("{} : Received update response: {}", selfRole, updateResponse.getRsp());
109         return this;
110     }
111
112     private static String extractRole(final Set<String> roles) {
113         return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
114                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
115     }
116 }