Add an actor for entity rpc execution.
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / checker / EntityRpcHandler.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.checker;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11
12 import akka.actor.typed.ActorRef;
13 import akka.actor.typed.Behavior;
14 import akka.actor.typed.javadsl.AbstractBehavior;
15 import akka.actor.typed.javadsl.ActorContext;
16 import akka.actor.typed.javadsl.AskPattern;
17 import akka.actor.typed.javadsl.Behaviors;
18 import akka.actor.typed.javadsl.Receive;
19 import akka.cluster.ddata.LWWRegister;
20 import akka.cluster.ddata.LWWRegisterKey;
21 import akka.cluster.ddata.ORMap;
22 import akka.cluster.ddata.ORSet;
23 import akka.cluster.ddata.typed.javadsl.DistributedData;
24 import akka.cluster.ddata.typed.javadsl.Replicator;
25 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
26 import java.time.Duration;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.CompletionStage;
33 import org.opendaylight.controller.eos.akka.owner.checker.command.AbstractEntityRequest;
34 import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidates;
35 import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidatesForEntity;
36 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesReply;
37 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
38 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
39 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
40 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
41 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
42 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnerForEntity;
43 import org.opendaylight.controller.eos.akka.owner.checker.command.OwnerDataResponse;
44 import org.opendaylight.controller.eos.akka.owner.checker.command.SingleEntityOwnerDataResponse;
45 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
50 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
51 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
52 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
53 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
54 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
55 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * Short-lived actor that is spawned purely for execution of rpcs from the entity-owners model.
61  */
62 public final class EntityRpcHandler extends AbstractBehavior<StateCheckerCommand> {
63     private static final Logger LOG = LoggerFactory.getLogger(EntityRpcHandler.class);
64     private static final Duration ASK_TIMEOUT = Duration.ofSeconds(5);
65
66     private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
67     private final ReplicatorMessageAdapter<StateCheckerCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
68
69     private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
70     private final ActorRef<Replicator.Command> replicator;
71
72     private final BindingInstanceIdentifierCodec iidCodec;
73
74     private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
75     private final Map<DOMEntity, String> currentOwners = new HashMap<>();
76     private final Map<String, DOMEntity> entityLookup = new HashMap<>();
77     private int toSync = -1;
78
79     public EntityRpcHandler(final ActorContext<StateCheckerCommand> context,
80                             final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
81                             final BindingInstanceIdentifierCodec iidCodec) {
82         super(context);
83
84         replicator = DistributedData.get(context.getSystem()).replicator();
85         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, ASK_TIMEOUT);
86         candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, ASK_TIMEOUT);
87         this.ownerSupervisor = ownerSupervisor;
88
89         this.iidCodec = iidCodec;
90     }
91
92     public static Behavior<StateCheckerCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
93                                                        final BindingInstanceIdentifierCodec iidCodec) {
94         return Behaviors.setup(ctx -> new EntityRpcHandler(ctx, ownerSupervisor, iidCodec));
95     }
96
97     @Override
98     public Receive<StateCheckerCommand> createReceive() {
99         return newReceiveBuilder()
100                 .onMessage(GetEntitiesRequest.class, this::onGetEntities)
101                 .onMessage(GetEntityRequest.class, this::onGetEntity)
102                 .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
103                 .onMessage(GetCandidates.class, this::onCandidatesReceived)
104                 .onMessage(GetCandidatesForEntity.class, this::onCandidatesForEntityReceived)
105                 .onMessage(OwnerDataResponse.class, this::onOwnerDataReceived)
106                 .onMessage(SingleEntityOwnerDataResponse.class, this::onSingleOwnerReceived)
107                 .onMessage(GetOwnerForEntity.class, this::onReplyWithOwner)
108                 .build();
109     }
110
111     private Behavior<StateCheckerCommand> onGetEntities(final GetEntitiesRequest request) {
112         LOG.debug("{} : Executing get-entities rpc.", getContext().getSelf());
113         final CompletionStage<GetEntitiesBackendReply> result = AskPattern.askWithStatus(
114                 ownerSupervisor,
115                 GetEntitiesBackendRequest::new,
116                 ASK_TIMEOUT,
117                 getContext().getSystem().scheduler()
118         );
119
120         result.whenComplete((response, throwable) -> {
121             if (response != null) {
122                 request.getReplyTo().tell(new GetEntitiesReply(response));
123             } else {
124                 // retry backed with distributed-data
125                 LOG.debug("{} : Get-entities failed with owner supervisor, falling back to distributed-data.",
126                         getContext().getSelf(), throwable);
127                 getCandidates(request.getReplyTo());
128             }
129         });
130         return this;
131     }
132
133     private Behavior<StateCheckerCommand> onGetEntity(final GetEntityRequest request) {
134         LOG.debug("{} : Executing get-entity rpc.", getContext().getSelf());
135         final CompletionStage<GetEntityBackendReply> result = AskPattern.askWithStatus(
136                 ownerSupervisor,
137                 replyTo -> new GetEntityBackendRequest(replyTo, request.getEntity()),
138                 ASK_TIMEOUT,
139                 getContext().getSystem().scheduler()
140         );
141
142         result.whenComplete((response, throwable) -> {
143             if (response != null) {
144                 request.getReplyTo().tell(new GetEntityReply(response));
145             } else {
146                 // retry backed with distributed-data
147                 LOG.debug("{} : Get-entity failed with owner supervisor, falling back to distributed-data.",
148                         getContext().getSelf(), throwable);
149                 getCandidatesForEntity(extractEntity(request), request.getReplyTo());
150             }
151         });
152         return this;
153     }
154
155     private Behavior<StateCheckerCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
156         LOG.debug("{} : Executing get-entity-owner rpc.", getContext().getSelf());
157         final CompletionStage<GetEntityOwnerBackendReply> result = AskPattern.askWithStatus(
158                 ownerSupervisor,
159                 replyTo -> new GetEntityOwnerBackendRequest(replyTo, request.getEntity()),
160                 ASK_TIMEOUT,
161                 getContext().getSystem().scheduler()
162         );
163
164         result.whenComplete((response, throwable) -> {
165             if (response != null) {
166                 request.getReplyTo().tell(new GetEntityOwnerReply(response.getOwner()));
167             } else {
168                 // retry backed with distributed-data
169                 LOG.debug("{} : Get-entity-owner failed with owner supervisor, falling back to distributed-data.",
170                         getContext().getSelf(), throwable);
171                 getOwnerForEntity(extractEntity(request), request.getReplyTo());
172             }
173         });
174         return this;
175     }
176
177     private void getCandidates(final ActorRef<GetEntitiesReply> replyTo) {
178         candidateReplicator.askGet(
179                 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
180                 replicatorResponse -> new GetCandidates(replicatorResponse, replyTo));
181     }
182
183     private void getCandidatesForEntity(final DOMEntity entity, final ActorRef<GetEntityReply> replyTo) {
184         candidateReplicator.askGet(
185                 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
186                 replicatorResponse -> new GetCandidatesForEntity(replicatorResponse, entity, replyTo));
187     }
188
189     private void getOwnerForEntity(final DOMEntity entity, final ActorRef<GetEntityOwnerReply> replyTo) {
190         ownerReplicator.askGet(
191                 askReplyTo -> new Replicator.Get<>(
192                         new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), askReplyTo),
193                 replicatorReponse -> new GetOwnerForEntity(replicatorReponse, entity, replyTo));
194     }
195
196     private Behavior<StateCheckerCommand> onReplyWithOwner(final GetOwnerForEntity message) {
197         final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
198         if (response instanceof Replicator.GetSuccess) {
199             message.getReplyTo().tell(new GetEntityOwnerReply(
200                     ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue()));
201         } else {
202             LOG.debug("Unable to retrieve owner for entity: {}, response: {}", message.getEntity(), response);
203             message.getReplyTo().tell(new GetEntityOwnerReply(""));
204         }
205
206         return Behaviors.stopped();
207     }
208
209     private Behavior<StateCheckerCommand> onCandidatesReceived(final GetCandidates message) {
210         final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = message.getResponse();
211         if (response instanceof Replicator.GetSuccess) {
212             return extractCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response,
213                     message.getReplyTo());
214         }
215
216         LOG.debug("Unable to retrieve candidates from distributed-data. Response: {}", response);
217         message.getReplyTo().tell(new GetEntitiesReply(Collections.emptyMap(), Collections.emptyMap()));
218         return Behaviors.stopped();
219     }
220
221     private Behavior<StateCheckerCommand> extractCandidates(
222             final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response,
223             final ActorRef<GetEntitiesReply> replyTo) {
224         final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
225         candidates.getEntries().forEach((key, value) -> currentCandidates.put(key, new HashSet<>(value.getElements())));
226
227         toSync = candidates.keys().size();
228         for (final DOMEntity entity : candidates.keys().getElements()) {
229             entityLookup.put(entity.toString(), entity);
230
231             ownerReplicator.askGet(
232                     askReplyTo -> new Replicator.Get<>(
233                             new LWWRegisterKey<>(entity.toString()),
234                             Replicator.readLocal(),
235                             askReplyTo),
236                     replicatorResponse -> new OwnerDataResponse(replicatorResponse, replyTo));
237         }
238
239         return this;
240     }
241
242     private Behavior<StateCheckerCommand> onOwnerDataReceived(final OwnerDataResponse message) {
243         final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
244         if (response instanceof Replicator.GetSuccess) {
245             handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
246         } else if (response instanceof Replicator.NotFound) {
247             handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
248         } else {
249             LOG.debug("Owner retrieval failed, response: {}", response);
250         }
251
252         // count the responses, on last respond to rpc and shutdown
253         toSync--;
254         if (toSync == 0) {
255             final GetEntitiesReply getEntitiesReply = new GetEntitiesReply(currentCandidates, currentOwners);
256             message.getReplyTo().tell(getEntitiesReply);
257             return Behaviors.stopped();
258         }
259
260         return this;
261     }
262
263     private Behavior<StateCheckerCommand> onCandidatesForEntityReceived(final GetCandidatesForEntity message) {
264         LOG.debug("Received CandidatesForEntity: {}", message);
265         final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = message.getResponse();
266         if (response instanceof Replicator.GetSuccess) {
267             return extractCandidatesForEntity((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response,
268                     message.getEntity(), message.getReplyTo());
269         } else {
270             LOG.debug("Unable to retrieve candidates for entity: {}. Response:: {}", message.getEntity(), response);
271             message.getReplyTo().tell(new GetEntityReply(null, Collections.emptySet()));
272             return this;
273         }
274     }
275
276     private Behavior<StateCheckerCommand> extractCandidatesForEntity(
277             final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response, final DOMEntity entity,
278             final ActorRef<GetEntityReply> replyTo) {
279         final Map<DOMEntity, ORSet<String>> entries = response.get(CandidateRegistry.KEY).getEntries();
280         currentCandidates.put(entity, entries.get(entity).getElements());
281
282         entityLookup.put(entity.toString(), entity);
283         ownerReplicator.askGet(
284                 askReplyTo -> new Replicator.Get<>(
285                         new LWWRegisterKey<>(entity.toString()),
286                         Replicator.readLocal(),
287                         askReplyTo),
288                 replicatorResponse -> new SingleEntityOwnerDataResponse(replicatorResponse, entity, replyTo));
289
290         return this;
291     }
292
293     private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
294         final DOMEntity entity = entityLookup.get(rsp.key().id());
295         final String owner = rsp.get(rsp.key()).getValue();
296
297         currentOwners.put(entity, owner);
298     }
299
300     private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
301         LOG.debug("Owner not found. {}", rsp);
302     }
303
304     private Behavior<StateCheckerCommand> onSingleOwnerReceived(final SingleEntityOwnerDataResponse message) {
305         LOG.debug("Received owner for single entity: {}", message);
306         final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
307         final GetEntityReply reply;
308         if (response instanceof Replicator.GetSuccess) {
309             reply = new GetEntityReply(((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue(),
310                     currentCandidates.get(message.getEntity()));
311         } else {
312             reply = new GetEntityReply(null, currentCandidates.get(message.getEntity()));
313         }
314
315         message.getReplyTo().tell(reply);
316         return Behaviors.stopped();
317     }
318
319     private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
320         final var name = request.getName();
321         final var iid = name.getInstanceIdentifier();
322         if (iid != null) {
323             return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
324         }
325         final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
326         return new DOMEntity(request.getType().getValue(), str);
327     }
328 }