e4db596f8b187bbaded1aaf6ff68f0a16645d799
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / AkkaEntityOwnershipService.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import akka.actor.ActorSystem;
11 import akka.actor.typed.ActorRef;
12 import akka.actor.typed.Scheduler;
13 import akka.actor.typed.javadsl.Adapter;
14 import akka.actor.typed.javadsl.AskPattern;
15 import akka.actor.typed.javadsl.Behaviors;
16 import akka.cluster.typed.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.collect.ImmutableClassToInstanceMap;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
22 import java.time.Duration;
23 import java.util.Optional;
24 import java.util.Set;
25 import java.util.concurrent.CompletionStage;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.function.Function;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.opendaylight.controller.cluster.ActorSystemProvider;
33 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
34 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
35 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
36 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
37 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
38 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
39 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
40 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
41 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
42 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
43 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
44 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
45 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
46 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerReply;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
49 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
50 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
51 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
52 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
53 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
54 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
55 import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
56 import org.opendaylight.mdsal.binding.api.RpcProviderService;
57 import org.opendaylight.mdsal.binding.dom.codec.api.BindingCodecTree;
58 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
59 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
60 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
61 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
62 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
63 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
64 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
65 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntities;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntity;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwner;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
75 import org.opendaylight.yangtools.concepts.Registration;
76 import org.opendaylight.yangtools.yang.binding.Rpc;
77 import org.opendaylight.yangtools.yang.binding.RpcOutput;
78 import org.opendaylight.yangtools.yang.common.Empty;
79 import org.opendaylight.yangtools.yang.common.RpcResult;
80 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
81 import org.osgi.service.component.annotations.Activate;
82 import org.osgi.service.component.annotations.Component;
83 import org.osgi.service.component.annotations.Deactivate;
84 import org.osgi.service.component.annotations.Reference;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 /**
89  * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data
90  * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects
91  * the appropriate owners.
92  */
93 @Singleton
94 @Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class })
95 public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable {
96     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
97     private static final String DATACENTER_PREFIX = "dc";
98     private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
99     private static final Duration QUERY_TIMEOUT = Duration.ofSeconds(10);
100
101     private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
102     private final String localCandidate;
103     private final Scheduler scheduler;
104     private final String datacenter;
105
106     private final ActorRef<BootstrapCommand> bootstrap;
107     private final RunningContext runningContext;
108     private final ActorRef<CandidateRegistryCommand> candidateRegistry;
109     private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
110     private final ActorRef<StateCheckerCommand> ownerStateChecker;
111     protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
112
113     private final BindingInstanceIdentifierCodec iidCodec;
114
115     private Registration reg;
116
117     @VisibleForTesting
118     protected AkkaEntityOwnershipService(final ActorSystem actorSystem, final BindingCodecTree codecTree)
119             throws ExecutionException, InterruptedException {
120         final var typedActorSystem = Adapter.toTyped(actorSystem);
121         scheduler = typedActorSystem.scheduler();
122
123         final Cluster cluster = Cluster.get(typedActorSystem);
124         datacenter = cluster.selfMember().dataCenter();
125
126         localCandidate = cluster.selfMember().getRoles().stream()
127             .filter(role -> !role.contains(DATACENTER_PREFIX))
128             .findFirst()
129             .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
130
131         iidCodec = codecTree.getInstanceIdentifierCodec();
132         bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(
133                 context -> EOSMain.create(iidCodec)), "EOSBootstrap");
134
135         final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
136                 GetRunningContext::new, Duration.ofSeconds(5), scheduler);
137         runningContext = ask.toCompletableFuture().get();
138
139         candidateRegistry = runningContext.getCandidateRegistry();
140         listenerRegistry = runningContext.getListenerRegistry();
141         ownerStateChecker = runningContext.getOwnerStateChecker();
142         ownerSupervisor = runningContext.getOwnerSupervisor();
143     }
144
145     @Inject
146     @Activate
147     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
148         justification = "Non-final for testing 'this' reference is expected to be stable at registration time")
149     public AkkaEntityOwnershipService(@Reference final ActorSystemProvider actorProvider,
150             @Reference final RpcProviderService rpcProvider, @Reference final BindingCodecTree codecTree)
151             throws ExecutionException, InterruptedException {
152         this(actorProvider.getActorSystem(), codecTree);
153
154         reg = rpcProvider.registerRpcImplementations(ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
155             .put(GetEntity.class, this::getEntity)
156             .put(GetEntities.class, this::getEntities)
157             .put(GetEntityOwner.class, this::getEntityOwner)
158             .build());
159     }
160
161     @PreDestroy
162     @Deactivate
163     @Override
164     public void close() throws InterruptedException, ExecutionException {
165         if (reg != null) {
166             reg.close();
167             reg = null;
168         }
169         AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
170     }
171
172     @Override
173     public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity)
174             throws CandidateAlreadyRegisteredException {
175         if (!registeredEntities.add(entity)) {
176             throw new CandidateAlreadyRegisteredException(entity);
177         }
178
179         final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate);
180         LOG.debug("Registering candidate with message: {}", msg);
181         candidateRegistry.tell(msg);
182
183         return new CandidateRegistration(entity, this);
184     }
185
186     @Override
187     public DOMEntityOwnershipListenerRegistration registerListener(final String entityType,
188                                                                    final DOMEntityOwnershipListener listener) {
189         LOG.debug("Registering listener {} for type {}", listener, entityType);
190         listenerRegistry.tell(new RegisterListener(entityType, listener));
191
192         return new ListenerRegistration(listener, entityType, this);
193     }
194
195     @Override
196     public Optional<EntityOwnershipState> getOwnershipState(final DOMEntity entity) {
197         LOG.debug("Retrieving ownership state for {}", entity);
198
199         final CompletionStage<GetOwnershipStateReply> result = AskPattern.ask(ownerStateChecker,
200             replyTo -> new GetOwnershipState(entity, replyTo),
201             Duration.ofSeconds(5), scheduler);
202
203         final GetOwnershipStateReply reply;
204         try {
205             reply = result.toCompletableFuture().get();
206         } catch (final InterruptedException | ExecutionException exception) {
207             LOG.warn("Failed to retrieve ownership state for {}", entity, exception);
208             return Optional.empty();
209         }
210
211         return Optional.ofNullable(reply.getOwnershipState());
212     }
213
214     @Override
215     public boolean isCandidateRegistered(final DOMEntity forEntity) {
216         return registeredEntities.contains(forEntity);
217     }
218
219     @Override
220     public ListenableFuture<Empty> activateDataCenter() {
221         LOG.debug("Activating datacenter: {}", datacenter);
222
223         return toListenableFuture("Activate",
224             AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
225     }
226
227     @Override
228     public ListenableFuture<Empty> deactivateDataCenter() {
229         LOG.debug("Deactivating datacenter: {}", datacenter);
230         return toListenableFuture("Deactivate",
231             AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
232     }
233
234     @VisibleForTesting
235     final ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
236         return toRpcFuture(AskPattern.ask(ownerStateChecker, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
237                 reply -> reply.toOutput(iidCodec));
238     }
239
240     @VisibleForTesting
241     final ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
242         return toRpcFuture(AskPattern.ask(ownerStateChecker,
243             (final ActorRef<GetEntityReply> replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler),
244             GetEntityReply::toOutput);
245     }
246
247     @VisibleForTesting
248     final ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
249         return toRpcFuture(AskPattern.ask(ownerStateChecker,
250             (final ActorRef<GetEntityOwnerReply> replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT,
251             scheduler), GetEntityOwnerReply::toOutput);
252     }
253
254     void unregisterCandidate(final DOMEntity entity) {
255         LOG.debug("Unregistering candidate for {}", entity);
256
257         if (registeredEntities.remove(entity)) {
258             candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate));
259         }
260     }
261
262     void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) {
263         LOG.debug("Unregistering listener {} for type {}", listener, entityType);
264
265         listenerRegistry.tell(new UnregisterListener(entityType, listener));
266     }
267
268     @VisibleForTesting
269     RunningContext getRunningContext() {
270         return runningContext;
271     }
272
273     private static <R extends StateCheckerReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
274             final CompletionStage<R> stage, final Function<R, O> outputFunction) {
275
276         final SettableFuture<RpcResult<O>> future = SettableFuture.create();
277         stage.whenComplete((reply, failure) -> {
278             if (failure != null) {
279                 future.setException(failure);
280             } else {
281                 future.set(RpcResultBuilder.success(outputFunction.apply(reply)).build());
282             }
283         });
284         return future;
285     }
286
287     private static ListenableFuture<Empty> toListenableFuture(final String op, final CompletionStage<?> stage) {
288         final SettableFuture<Empty> future = SettableFuture.create();
289         stage.whenComplete((reply, failure) -> {
290             if (failure != null) {
291                 LOG.warn("{} DataCenter failed", op, failure);
292                 future.setException(failure);
293             } else {
294                 LOG.debug("{} DataCenter successful", op);
295                 future.set(Empty.value());
296             }
297         });
298         return future;
299     }
300 }