2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka;
10 import akka.actor.ActorSystem;
11 import akka.actor.typed.ActorRef;
12 import akka.actor.typed.Scheduler;
13 import akka.actor.typed.javadsl.Adapter;
14 import akka.actor.typed.javadsl.AskPattern;
15 import akka.actor.typed.javadsl.Behaviors;
16 import akka.cluster.typed.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.time.Duration;
22 import java.util.Optional;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.function.Function;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
33 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
34 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
35 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
36 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
37 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
38 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
39 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
40 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
41 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
42 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
43 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
44 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
45 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerReply;
46 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
47 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
48 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
49 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
50 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
51 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
52 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
53 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
54 import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
55 import org.opendaylight.mdsal.binding.api.RpcProviderService;
56 import org.opendaylight.mdsal.binding.dom.codec.api.BindingCodecTree;
57 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
58 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
59 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
60 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
61 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
62 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntities;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntity;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwner;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
72 import org.opendaylight.yangtools.concepts.Registration;
73 import org.opendaylight.yangtools.yang.binding.RpcOutput;
74 import org.opendaylight.yangtools.yang.common.Empty;
75 import org.opendaylight.yangtools.yang.common.RpcResult;
76 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
77 import org.osgi.service.component.annotations.Activate;
78 import org.osgi.service.component.annotations.Component;
79 import org.osgi.service.component.annotations.Deactivate;
80 import org.osgi.service.component.annotations.Reference;
81 import org.slf4j.Logger;
82 import org.slf4j.LoggerFactory;
85 * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data
86 * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects
87 * the appropriate owners.
90 @Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class })
91 public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable {
92 private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
93 private static final String DATACENTER_PREFIX = "dc";
94 private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
95 private static final Duration QUERY_TIMEOUT = Duration.ofSeconds(10);
97 private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
98 private final String localCandidate;
99 private final Scheduler scheduler;
100 private final String datacenter;
102 private final ActorRef<BootstrapCommand> bootstrap;
103 private final RunningContext runningContext;
104 private final ActorRef<CandidateRegistryCommand> candidateRegistry;
105 private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
106 private final ActorRef<StateCheckerCommand> ownerStateChecker;
107 protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
109 private final BindingInstanceIdentifierCodec iidCodec;
111 private Registration reg;
114 protected AkkaEntityOwnershipService(final ActorSystem actorSystem, final BindingCodecTree codecTree)
115 throws ExecutionException, InterruptedException {
116 final var typedActorSystem = Adapter.toTyped(actorSystem);
117 scheduler = typedActorSystem.scheduler();
119 final Cluster cluster = Cluster.get(typedActorSystem);
120 datacenter = cluster.selfMember().dataCenter();
122 localCandidate = cluster.selfMember().getRoles().stream()
123 .filter(role -> !role.contains(DATACENTER_PREFIX))
125 .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
127 iidCodec = codecTree.getInstanceIdentifierCodec();
128 bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(
129 context -> EOSMain.create(iidCodec)), "EOSBootstrap");
131 final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
132 GetRunningContext::new, Duration.ofSeconds(5), scheduler);
133 runningContext = ask.toCompletableFuture().get();
135 candidateRegistry = runningContext.getCandidateRegistry();
136 listenerRegistry = runningContext.getListenerRegistry();
137 ownerStateChecker = runningContext.getOwnerStateChecker();
138 ownerSupervisor = runningContext.getOwnerSupervisor();
143 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
144 justification = "Non-final for testing 'this' reference is expected to be stable at registration time")
145 public AkkaEntityOwnershipService(@Reference final ActorSystemProvider actorProvider,
146 @Reference final RpcProviderService rpcProvider, @Reference final BindingCodecTree codecTree)
147 throws ExecutionException, InterruptedException {
148 this(actorProvider.getActorSystem(), codecTree);
150 reg = rpcProvider.registerRpcImplementations(
151 (GetEntity) this::getEntity,
152 (GetEntities) this::getEntities,
153 (GetEntityOwner) this::getEntityOwner);
159 public void close() throws InterruptedException, ExecutionException {
164 AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
168 public Registration registerCandidate(final DOMEntity entity)
169 throws CandidateAlreadyRegisteredException {
170 if (!registeredEntities.add(entity)) {
171 throw new CandidateAlreadyRegisteredException(entity);
174 final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate);
175 LOG.debug("Registering candidate with message: {}", msg);
176 candidateRegistry.tell(msg);
178 return new CandidateRegistration(entity, this);
182 public Registration registerListener(final String entityType, final DOMEntityOwnershipListener listener) {
183 LOG.debug("Registering listener {} for type {}", listener, entityType);
184 listenerRegistry.tell(new RegisterListener(entityType, listener));
186 return new ListenerRegistration(listener, entityType, this);
190 public Optional<EntityOwnershipState> getOwnershipState(final DOMEntity entity) {
191 LOG.debug("Retrieving ownership state for {}", entity);
193 final CompletionStage<GetOwnershipStateReply> result = AskPattern.ask(ownerStateChecker,
194 replyTo -> new GetOwnershipState(entity, replyTo),
195 Duration.ofSeconds(5), scheduler);
197 final GetOwnershipStateReply reply;
199 reply = result.toCompletableFuture().get();
200 } catch (final InterruptedException | ExecutionException exception) {
201 LOG.warn("Failed to retrieve ownership state for {}", entity, exception);
202 return Optional.empty();
205 return Optional.ofNullable(reply.getOwnershipState());
209 public boolean isCandidateRegistered(final DOMEntity forEntity) {
210 return registeredEntities.contains(forEntity);
214 public ListenableFuture<Empty> activateDataCenter() {
215 LOG.debug("Activating datacenter: {}", datacenter);
217 return toListenableFuture("Activate",
218 AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
222 public ListenableFuture<Empty> deactivateDataCenter() {
223 LOG.debug("Deactivating datacenter: {}", datacenter);
224 return toListenableFuture("Deactivate",
225 AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
229 final ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
230 return toRpcFuture(AskPattern.ask(ownerStateChecker, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
231 reply -> reply.toOutput(iidCodec));
235 final ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
236 return toRpcFuture(AskPattern.ask(ownerStateChecker,
237 (final ActorRef<GetEntityReply> replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler),
238 GetEntityReply::toOutput);
242 final ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
243 return toRpcFuture(AskPattern.ask(ownerStateChecker,
244 (final ActorRef<GetEntityOwnerReply> replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT,
245 scheduler), GetEntityOwnerReply::toOutput);
248 void unregisterCandidate(final DOMEntity entity) {
249 LOG.debug("Unregistering candidate for {}", entity);
251 if (registeredEntities.remove(entity)) {
252 candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate));
256 void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) {
257 LOG.debug("Unregistering listener {} for type {}", listener, entityType);
259 listenerRegistry.tell(new UnregisterListener(entityType, listener));
263 RunningContext getRunningContext() {
264 return runningContext;
267 private static <R extends StateCheckerReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
268 final CompletionStage<R> stage, final Function<R, O> outputFunction) {
270 final SettableFuture<RpcResult<O>> future = SettableFuture.create();
271 stage.whenComplete((reply, failure) -> {
272 if (failure != null) {
273 future.setException(failure);
275 future.set(RpcResultBuilder.success(outputFunction.apply(reply)).build());
281 private static ListenableFuture<Empty> toListenableFuture(final String op, final CompletionStage<?> stage) {
282 final SettableFuture<Empty> future = SettableFuture.create();
283 stage.whenComplete((reply, failure) -> {
284 if (failure != null) {
285 LOG.warn("{} DataCenter failed", op, failure);
286 future.setException(failure);
288 LOG.debug("{} DataCenter successful", op);
289 future.set(Empty.value());