Expose entity details in MDSAL
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / AkkaEntityOwnershipService.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import akka.actor.ActorSystem;
11 import akka.actor.typed.ActorRef;
12 import akka.actor.typed.Scheduler;
13 import akka.actor.typed.javadsl.Adapter;
14 import akka.actor.typed.javadsl.AskPattern;
15 import akka.actor.typed.javadsl.Behaviors;
16 import akka.cluster.typed.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.time.Duration;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.function.Function;
27 import javax.annotation.PreDestroy;
28 import javax.inject.Inject;
29 import javax.inject.Singleton;
30 import org.opendaylight.controller.cluster.ActorSystemProvider;
31 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
32 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
33 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
34 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
35 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
36 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
37 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
38 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
39 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
40 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
42 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
43 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
44 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
45 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
49 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
50 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
51 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
52 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
53 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
54 import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
55 import org.opendaylight.mdsal.binding.api.RpcProviderService;
56 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
57 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
58 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
59 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
60 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
61 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
62 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.OdlEntityOwnersService;
70 import org.opendaylight.yangtools.concepts.Registration;
71 import org.opendaylight.yangtools.yang.binding.RpcOutput;
72 import org.opendaylight.yangtools.yang.common.Empty;
73 import org.opendaylight.yangtools.yang.common.RpcResult;
74 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
75 import org.osgi.service.component.annotations.Activate;
76 import org.osgi.service.component.annotations.Component;
77 import org.osgi.service.component.annotations.Deactivate;
78 import org.osgi.service.component.annotations.Reference;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 /**
83  * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data
84  * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects
85  * the appropriate owners.
86  */
87 @Singleton
88 @Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class })
89 public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable,
90         OdlEntityOwnersService {
91     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
92     private static final String DATACENTER_PREFIX = "dc";
93     private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
94     private static final Duration QUERY_TIMEOUT = Duration.ofSeconds(10);
95
96     private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
97     private final String localCandidate;
98     private final Scheduler scheduler;
99     private final String datacenter;
100
101     private final ActorRef<BootstrapCommand> bootstrap;
102     private final RunningContext runningContext;
103     private final ActorRef<CandidateRegistryCommand> candidateRegistry;
104     private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
105     private final ActorRef<StateCheckerCommand> ownerStateChecker;
106     protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
107
108     private Registration reg;
109
110     @VisibleForTesting
111     protected AkkaEntityOwnershipService(final ActorSystem actorSystem)
112             throws ExecutionException, InterruptedException {
113         final var typedActorSystem = Adapter.toTyped(actorSystem);
114         scheduler = typedActorSystem.scheduler();
115
116         final Cluster cluster = Cluster.get(typedActorSystem);
117         datacenter = cluster.selfMember().dataCenter();
118
119         localCandidate = cluster.selfMember().getRoles().stream()
120             .filter(role -> !role.contains(DATACENTER_PREFIX))
121             .findFirst()
122             .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
123
124         bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
125
126         final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
127                 GetRunningContext::new, Duration.ofSeconds(5), scheduler);
128         runningContext = ask.toCompletableFuture().get();
129
130         candidateRegistry = runningContext.getCandidateRegistry();
131         listenerRegistry = runningContext.getListenerRegistry();
132         ownerStateChecker = runningContext.getOwnerStateChecker();
133         ownerSupervisor = runningContext.getOwnerSupervisor();
134     }
135
136     @Inject
137     @Activate
138     public AkkaEntityOwnershipService(@Reference final ActorSystemProvider actorProvider,
139             @Reference final RpcProviderService rpcProvider) throws ExecutionException, InterruptedException {
140         this(actorProvider.getActorSystem());
141
142         reg = rpcProvider.registerRpcImplementation(OdlEntityOwnersService.class, this);
143     }
144
145     @PreDestroy
146     @Deactivate
147     @Override
148     public void close() throws InterruptedException, ExecutionException {
149         if (reg != null) {
150             reg.close();
151             reg = null;
152         }
153         AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
154     }
155
156     @Override
157     public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity)
158             throws CandidateAlreadyRegisteredException {
159         if (!registeredEntities.add(entity)) {
160             throw new CandidateAlreadyRegisteredException(entity);
161         }
162
163         final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate);
164         LOG.debug("Registering candidate with message: {}", msg);
165         candidateRegistry.tell(msg);
166
167         return new CandidateRegistration(entity, this);
168     }
169
170     @Override
171     public DOMEntityOwnershipListenerRegistration registerListener(final String entityType,
172                                                                    final DOMEntityOwnershipListener listener) {
173         LOG.debug("Registering listener {} for type {}", listener, entityType);
174         listenerRegistry.tell(new RegisterListener(entityType, listener));
175
176         return new ListenerRegistration(listener, entityType, this);
177     }
178
179     @Override
180     public Optional<EntityOwnershipState> getOwnershipState(final DOMEntity entity) {
181         LOG.debug("Retrieving ownership state for {}", entity);
182
183         final CompletionStage<GetOwnershipStateReply> result = AskPattern.ask(ownerStateChecker,
184             replyTo -> new GetOwnershipState(entity, replyTo),
185             Duration.ofSeconds(5), scheduler);
186
187         final GetOwnershipStateReply reply;
188         try {
189             reply = result.toCompletableFuture().get();
190         } catch (final InterruptedException | ExecutionException exception) {
191             LOG.warn("Failed to retrieve ownership state for {}", entity, exception);
192             return Optional.empty();
193         }
194
195         return Optional.ofNullable(reply.getOwnershipState());
196     }
197
198     @Override
199     public boolean isCandidateRegistered(final DOMEntity forEntity) {
200         return registeredEntities.contains(forEntity);
201     }
202
203     @Override
204     public ListenableFuture<Empty> activateDataCenter() {
205         LOG.debug("Activating datacenter: {}", datacenter);
206
207         return toListenableFuture("Activate",
208             AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
209     }
210
211     @Override
212     public ListenableFuture<Empty> deactivateDataCenter() {
213         LOG.debug("Deactivating datacenter: {}", datacenter);
214         return toListenableFuture("Deactivate",
215             AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
216     }
217
218
219     @Override
220     public ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
221         return toRpcFuture(AskPattern.ask(ownerSupervisor, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
222             GetEntitiesReply::toOutput);
223     }
224
225     @Override
226     public ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
227         return toRpcFuture(AskPattern.ask(ownerSupervisor,
228             (final ActorRef<GetEntityReply> replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler),
229             GetEntityReply::toOutput);
230     }
231
232     @Override
233     public ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
234         return toRpcFuture(AskPattern.ask(ownerSupervisor,
235             (final ActorRef<GetEntityOwnerReply> replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT,
236             scheduler), GetEntityOwnerReply::toOutput);
237     }
238
239     void unregisterCandidate(final DOMEntity entity) {
240         LOG.debug("Unregistering candidate for {}", entity);
241
242         if (registeredEntities.remove(entity)) {
243             candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate));
244         }
245     }
246
247     void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) {
248         LOG.debug("Unregistering listener {} for type {}", listener, entityType);
249
250         listenerRegistry.tell(new UnregisterListener(entityType, listener));
251     }
252
253     @VisibleForTesting
254     RunningContext getRunningContext() {
255         return runningContext;
256     }
257
258     private static <R extends OwnerSupervisorReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
259             final CompletionStage<R> stage, final Function<R, O> outputFunction) {
260
261         final SettableFuture<RpcResult<O>> future = SettableFuture.create();
262         stage.whenComplete((reply, failure) -> {
263             if (failure != null) {
264                 future.setException(failure);
265             } else {
266                 future.set(RpcResultBuilder.success(outputFunction.apply(reply)).build());
267             }
268         });
269         return future;
270     }
271
272     private static ListenableFuture<Empty> toListenableFuture(final String op, final CompletionStage<?> stage) {
273         final SettableFuture<Empty> future = SettableFuture.create();
274         stage.whenComplete((reply, failure) -> {
275             if (failure != null) {
276                 LOG.warn("{} DataCenter failed", op, failure);
277                 future.setException(failure);
278             } else {
279                 LOG.debug("{} DataCenter successful", op);
280                 future.set(Empty.getInstance());
281             }
282         });
283         return future;
284     }
285 }