34cfe78bad9fc915ae702137f40e0025e2de0b78
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / candidate / CandidateRegistryInit.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.registry.candidate;
9
10 import akka.actor.typed.Behavior;
11 import akka.actor.typed.javadsl.AbstractBehavior;
12 import akka.actor.typed.javadsl.ActorContext;
13 import akka.actor.typed.javadsl.Behaviors;
14 import akka.actor.typed.javadsl.Receive;
15 import akka.actor.typed.javadsl.StashBuffer;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORSet;
18 import akka.cluster.ddata.SelfUniqueAddress;
19 import akka.cluster.ddata.typed.javadsl.DistributedData;
20 import akka.cluster.ddata.typed.javadsl.Replicator;
21 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
22 import akka.cluster.typed.Cluster;
23 import java.time.Duration;
24 import java.util.Map;
25 import java.util.Set;
26 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
27 import org.opendaylight.controller.eos.akka.registry.candidate.command.InitialCandidateSync;
28 import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
29 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
30 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
31 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class CandidateRegistryInit extends AbstractBehavior<CandidateRegistryCommand> {
36
37     private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistryInit.class);
38
39     private static final String DATACENTER_PREFIX = "dc-";
40
41     private final StashBuffer<CandidateRegistryCommand> stash;
42     private final ReplicatorMessageAdapter<CandidateRegistryCommand,
43             ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
44     private final String selfRole;
45     private final SelfUniqueAddress node;
46
47     public CandidateRegistryInit(final ActorContext<CandidateRegistryCommand> ctx,
48                                  final StashBuffer<CandidateRegistryCommand> stash,
49                                  final ReplicatorMessageAdapter<CandidateRegistryCommand,
50                                          ORMap<DOMEntity, ORSet<String>>> candidateReplicator) {
51         super(ctx);
52         this.stash = stash;
53         this.candidateReplicator = candidateReplicator;
54         selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
55
56         this.node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();
57
58
59         this.candidateReplicator.askGet(
60                 askReplyTo -> new Replicator.Get<>(
61                         CandidateRegistry.KEY,
62                         new Replicator.ReadAll(Duration.ofSeconds(15)), askReplyTo),
63                 InitialCandidateSync::new);
64
65         LOG.debug("CandidateRegistry syncing behavior started.");
66     }
67
68     public static Behavior<CandidateRegistryCommand> create() {
69         return Behaviors.withStash(100,
70                 stash ->
71                         Behaviors.setup(ctx -> DistributedData.withReplicatorMessageAdapter(
72                                 (ReplicatorMessageAdapter<CandidateRegistryCommand,
73                                         ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) ->
74                                         new CandidateRegistryInit(ctx, stash, replicatorAdapter))));
75     }
76
77     @Override
78     public Receive<CandidateRegistryCommand> createReceive() {
79         return newReceiveBuilder()
80                 .onMessage(InitialCandidateSync.class, this::handleCandidateSync)
81                 .onMessage(RegisterCandidate.class, this::stashCommand)
82                 .onMessage(UnregisterCandidate.class, this::stashCommand)
83                 .build();
84     }
85
86     private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
87         stash.stash(command);
88         return this;
89     }
90
91     private Behavior<CandidateRegistryCommand> handleCandidateSync(final InitialCandidateSync command) {
92         final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = command.getResponse();
93         if (response instanceof Replicator.GetSuccess) {
94             clearExistingCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
95         }
96         // TODO implement other cases if needed, seems like only a retry would be needed here when we get a failure
97         // from distributed data
98         return switchToCandidateRegistry();
99     }
100
101     private void clearExistingCandidates(final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
102         final Map<DOMEntity, ORSet<String>> entitiesToCandidates = response.get(response.key()).getEntries();
103
104         for (Map.Entry<DOMEntity, ORSet<String>> entry : entitiesToCandidates.entrySet()) {
105             if (entry.getValue().getElements().contains(selfRole)) {
106                 LOG.debug("Clearing candidate: {} from entity: {}, current state of entity candidates: {}",
107                         selfRole, entry.getKey(), entry.getValue().getElements());
108                 clearRegistration(entry.getKey());
109             }
110         }
111     }
112
113     private void clearRegistration(final DOMEntity entity) {
114         candidateReplicator.askUpdate(
115                 askReplyTo -> new Replicator.Update<>(
116                         CandidateRegistry.KEY,
117                         ORMap.empty(),
118                         Replicator.writeLocal(),
119                         askReplyTo,
120                         map -> map.update(node, entity, ORSet.empty(),
121                                 value -> value.remove(node, selfRole))),
122                 InternalUpdateResponse::new);
123     }
124
125     private Behavior<CandidateRegistryCommand> switchToCandidateRegistry() {
126         LOG.debug("Clearing of candidates from previous instance done, switching to CandidateRegistry.");
127         return stash.unstashAll(CandidateRegistry.create());
128     }
129
130     private static String extractRole(final Set<String> roles) {
131         return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
132                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
133     }
134 }