2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.owner.checker;
10 import static com.google.common.base.Verify.verifyNotNull;
12 import akka.actor.typed.ActorRef;
13 import akka.actor.typed.Behavior;
14 import akka.actor.typed.javadsl.AbstractBehavior;
15 import akka.actor.typed.javadsl.ActorContext;
16 import akka.actor.typed.javadsl.AskPattern;
17 import akka.actor.typed.javadsl.Behaviors;
18 import akka.actor.typed.javadsl.Receive;
19 import akka.cluster.ddata.LWWRegister;
20 import akka.cluster.ddata.LWWRegisterKey;
21 import akka.cluster.ddata.ORMap;
22 import akka.cluster.ddata.ORSet;
23 import akka.cluster.ddata.typed.javadsl.DistributedData;
24 import akka.cluster.ddata.typed.javadsl.Replicator;
25 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
26 import java.time.Duration;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
32 import java.util.concurrent.CompletionStage;
33 import org.opendaylight.controller.eos.akka.owner.checker.command.AbstractEntityRequest;
34 import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidates;
35 import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidatesForEntity;
36 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesReply;
37 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
38 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
39 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
40 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
41 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
42 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnerForEntity;
43 import org.opendaylight.controller.eos.akka.owner.checker.command.OwnerDataResponse;
44 import org.opendaylight.controller.eos.akka.owner.checker.command.SingleEntityOwnerDataResponse;
45 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
51 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
52 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
53 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
54 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
55 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
60 * Short-lived actor that is spawned purely for execution of rpcs from the entity-owners model.
62 public final class EntityRpcHandler extends AbstractBehavior<StateCheckerCommand> {
63 private static final Logger LOG = LoggerFactory.getLogger(EntityRpcHandler.class);
64 private static final Duration ASK_TIMEOUT = Duration.ofSeconds(5);
66 private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
67 private final ReplicatorMessageAdapter<StateCheckerCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
69 private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
70 private final ActorRef<Replicator.Command> replicator;
72 private final BindingInstanceIdentifierCodec iidCodec;
74 private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
75 private final Map<DOMEntity, String> currentOwners = new HashMap<>();
76 private final Map<String, DOMEntity> entityLookup = new HashMap<>();
77 private int toSync = -1;
79 public EntityRpcHandler(final ActorContext<StateCheckerCommand> context,
80 final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
81 final BindingInstanceIdentifierCodec iidCodec) {
84 replicator = DistributedData.get(context.getSystem()).replicator();
85 ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, ASK_TIMEOUT);
86 candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, ASK_TIMEOUT);
87 this.ownerSupervisor = ownerSupervisor;
89 this.iidCodec = iidCodec;
92 public static Behavior<StateCheckerCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
93 final BindingInstanceIdentifierCodec iidCodec) {
94 return Behaviors.setup(ctx -> new EntityRpcHandler(ctx, ownerSupervisor, iidCodec));
98 public Receive<StateCheckerCommand> createReceive() {
99 return newReceiveBuilder()
100 .onMessage(GetEntitiesRequest.class, this::onGetEntities)
101 .onMessage(GetEntityRequest.class, this::onGetEntity)
102 .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
103 .onMessage(GetCandidates.class, this::onCandidatesReceived)
104 .onMessage(GetCandidatesForEntity.class, this::onCandidatesForEntityReceived)
105 .onMessage(OwnerDataResponse.class, this::onOwnerDataReceived)
106 .onMessage(SingleEntityOwnerDataResponse.class, this::onSingleOwnerReceived)
107 .onMessage(GetOwnerForEntity.class, this::onReplyWithOwner)
111 private Behavior<StateCheckerCommand> onGetEntities(final GetEntitiesRequest request) {
112 LOG.debug("{} : Executing get-entities rpc.", getContext().getSelf());
113 final CompletionStage<GetEntitiesBackendReply> result = AskPattern.askWithStatus(
115 GetEntitiesBackendRequest::new,
117 getContext().getSystem().scheduler()
120 result.whenComplete((response, throwable) -> {
121 if (response != null) {
122 request.getReplyTo().tell(new GetEntitiesReply(response));
124 // retry backed with distributed-data
125 LOG.debug("{} : Get-entities failed with owner supervisor, falling back to distributed-data.",
126 getContext().getSelf(), throwable);
127 getCandidates(request.getReplyTo());
133 private Behavior<StateCheckerCommand> onGetEntity(final GetEntityRequest request) {
134 LOG.debug("{} : Executing get-entity rpc.", getContext().getSelf());
135 final CompletionStage<GetEntityBackendReply> result = AskPattern.askWithStatus(
137 replyTo -> new GetEntityBackendRequest(replyTo, request.getEntity()),
139 getContext().getSystem().scheduler()
142 result.whenComplete((response, throwable) -> {
143 if (response != null) {
144 request.getReplyTo().tell(new GetEntityReply(response));
146 // retry backed with distributed-data
147 LOG.debug("{} : Get-entity failed with owner supervisor, falling back to distributed-data.",
148 getContext().getSelf(), throwable);
149 getCandidatesForEntity(extractEntity(request), request.getReplyTo());
155 private Behavior<StateCheckerCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
156 LOG.debug("{} : Executing get-entity-owner rpc.", getContext().getSelf());
157 final CompletionStage<GetEntityOwnerBackendReply> result = AskPattern.askWithStatus(
159 replyTo -> new GetEntityOwnerBackendRequest(replyTo, request.getEntity()),
161 getContext().getSystem().scheduler()
164 result.whenComplete((response, throwable) -> {
165 if (response != null) {
166 request.getReplyTo().tell(new GetEntityOwnerReply(response.getOwner()));
168 // retry backed with distributed-data
169 LOG.debug("{} : Get-entity-owner failed with owner supervisor, falling back to distributed-data.",
170 getContext().getSelf(), throwable);
171 getOwnerForEntity(extractEntity(request), request.getReplyTo());
177 private void getCandidates(final ActorRef<GetEntitiesReply> replyTo) {
178 candidateReplicator.askGet(
179 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
180 replicatorResponse -> new GetCandidates(replicatorResponse, replyTo));
183 private void getCandidatesForEntity(final DOMEntity entity, final ActorRef<GetEntityReply> replyTo) {
184 candidateReplicator.askGet(
185 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
186 replicatorResponse -> new GetCandidatesForEntity(replicatorResponse, entity, replyTo));
189 private void getOwnerForEntity(final DOMEntity entity, final ActorRef<GetEntityOwnerReply> replyTo) {
190 ownerReplicator.askGet(
191 askReplyTo -> new Replicator.Get<>(
192 new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), askReplyTo),
193 replicatorReponse -> new GetOwnerForEntity(replicatorReponse, entity, replyTo));
196 private Behavior<StateCheckerCommand> onReplyWithOwner(final GetOwnerForEntity message) {
197 final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
198 if (response instanceof Replicator.GetSuccess) {
199 message.getReplyTo().tell(new GetEntityOwnerReply(
200 ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue()));
202 LOG.debug("Unable to retrieve owner for entity: {}, response: {}", message.getEntity(), response);
203 message.getReplyTo().tell(new GetEntityOwnerReply(""));
206 return Behaviors.stopped();
209 private Behavior<StateCheckerCommand> onCandidatesReceived(final GetCandidates message) {
210 final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = message.getResponse();
211 if (response instanceof Replicator.GetSuccess) {
212 return extractCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response,
213 message.getReplyTo());
216 LOG.debug("Unable to retrieve candidates from distributed-data. Response: {}", response);
217 message.getReplyTo().tell(new GetEntitiesReply(Collections.emptyMap(), Collections.emptyMap()));
218 return Behaviors.stopped();
221 private Behavior<StateCheckerCommand> extractCandidates(
222 final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response,
223 final ActorRef<GetEntitiesReply> replyTo) {
224 final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
225 candidates.getEntries().forEach((key, value) -> currentCandidates.put(key, new HashSet<>(value.getElements())));
227 toSync = candidates.keys().size();
228 for (final DOMEntity entity : candidates.keys().getElements()) {
229 entityLookup.put(entity.toString(), entity);
231 ownerReplicator.askGet(
232 askReplyTo -> new Replicator.Get<>(
233 new LWWRegisterKey<>(entity.toString()),
234 Replicator.readLocal(),
236 replicatorResponse -> new OwnerDataResponse(replicatorResponse, replyTo));
242 private Behavior<StateCheckerCommand> onOwnerDataReceived(final OwnerDataResponse message) {
243 final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
244 if (response instanceof Replicator.GetSuccess) {
245 handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
246 } else if (response instanceof Replicator.NotFound) {
247 handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
249 LOG.debug("Owner retrieval failed, response: {}", response);
252 // count the responses, on last respond to rpc and shutdown
255 final GetEntitiesReply getEntitiesReply = new GetEntitiesReply(currentCandidates, currentOwners);
256 message.getReplyTo().tell(getEntitiesReply);
257 return Behaviors.stopped();
263 private Behavior<StateCheckerCommand> onCandidatesForEntityReceived(final GetCandidatesForEntity message) {
264 LOG.debug("Received CandidatesForEntity: {}", message);
265 final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = message.getResponse();
266 if (response instanceof Replicator.GetSuccess) {
267 return extractCandidatesForEntity((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response,
268 message.getEntity(), message.getReplyTo());
270 LOG.debug("Unable to retrieve candidates for entity: {}. Response:: {}", message.getEntity(), response);
271 message.getReplyTo().tell(new GetEntityReply(null, Collections.emptySet()));
276 private Behavior<StateCheckerCommand> extractCandidatesForEntity(
277 final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response, final DOMEntity entity,
278 final ActorRef<GetEntityReply> replyTo) {
279 final Map<DOMEntity, ORSet<String>> entries = response.get(CandidateRegistry.KEY).getEntries();
280 currentCandidates.put(entity, entries.get(entity).getElements());
282 entityLookup.put(entity.toString(), entity);
283 ownerReplicator.askGet(
284 askReplyTo -> new Replicator.Get<>(
285 new LWWRegisterKey<>(entity.toString()),
286 Replicator.readLocal(),
288 replicatorResponse -> new SingleEntityOwnerDataResponse(replicatorResponse, entity, replyTo));
293 private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
294 final DOMEntity entity = entityLookup.get(rsp.key().id());
295 final String owner = rsp.get(rsp.key()).getValue();
297 currentOwners.put(entity, owner);
300 private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
301 LOG.debug("Owner not found. {}", rsp);
304 private Behavior<StateCheckerCommand> onSingleOwnerReceived(final SingleEntityOwnerDataResponse message) {
305 LOG.debug("Received owner for single entity: {}", message);
306 final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
307 final GetEntityReply reply;
308 if (response instanceof Replicator.GetSuccess) {
309 reply = new GetEntityReply(((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue(),
310 currentCandidates.get(message.getEntity()));
312 reply = new GetEntityReply(null, currentCandidates.get(message.getEntity()));
315 message.getReplyTo().tell(reply);
316 return Behaviors.stopped();
319 private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
320 final var name = request.getName();
321 final var iid = name.getInstanceIdentifier();
323 return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
325 final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
326 return new DOMEntity(request.getType().getValue(), str);