2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka;
10 import akka.actor.ActorSystem;
11 import akka.actor.typed.ActorRef;
12 import akka.actor.typed.Scheduler;
13 import akka.actor.typed.javadsl.Adapter;
14 import akka.actor.typed.javadsl.AskPattern;
15 import akka.actor.typed.javadsl.Behaviors;
16 import akka.cluster.typed.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.time.Duration;
21 import java.util.Optional;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import javax.annotation.PreDestroy;
27 import javax.inject.Inject;
28 import javax.inject.Singleton;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
31 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
32 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
33 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
34 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
35 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
36 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
37 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
38 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
39 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
40 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
41 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
42 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
43 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
44 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
45 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
46 import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
47 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
48 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
49 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
50 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
51 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
52 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
53 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
54 import org.opendaylight.yangtools.yang.common.Empty;
55 import org.osgi.service.component.annotations.Activate;
56 import org.osgi.service.component.annotations.Component;
57 import org.osgi.service.component.annotations.Deactivate;
58 import org.osgi.service.component.annotations.Reference;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
63 * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data
64 * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects
65 * the appropriate owners.
68 @Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class })
69 public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable {
70 private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
71 private static final String DATACENTER_PREFIX = "dc";
72 private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
74 private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
75 private final String localCandidate;
76 private final Scheduler scheduler;
77 private final String datacenter;
79 private final ActorRef<BootstrapCommand> bootstrap;
80 private final RunningContext runningContext;
81 private final ActorRef<CandidateRegistryCommand> candidateRegistry;
82 private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
83 private final ActorRef<StateCheckerCommand> ownerStateChecker;
84 protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
87 protected AkkaEntityOwnershipService(final ActorSystem actorSystem)
88 throws ExecutionException, InterruptedException {
89 final var typedActorSystem = Adapter.toTyped(actorSystem);
90 scheduler = typedActorSystem.scheduler();
92 final Cluster cluster = Cluster.get(typedActorSystem);
93 datacenter = cluster.selfMember().dataCenter();
95 localCandidate = cluster.selfMember().getRoles().stream()
96 .filter(role -> !role.contains(DATACENTER_PREFIX))
98 .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
100 bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
102 final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
103 GetRunningContext::new, Duration.ofSeconds(5), scheduler);
104 runningContext = ask.toCompletableFuture().get();
106 candidateRegistry = runningContext.getCandidateRegistry();
107 listenerRegistry = runningContext.getListenerRegistry();
108 ownerStateChecker = runningContext.getOwnerStateChecker();
109 ownerSupervisor = runningContext.getOwnerSupervisor();
114 public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider)
115 throws ExecutionException, InterruptedException {
116 this(provider.getActorSystem());
122 public void close() throws InterruptedException, ExecutionException {
123 AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
127 public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity)
128 throws CandidateAlreadyRegisteredException {
129 if (!registeredEntities.add(entity)) {
130 throw new CandidateAlreadyRegisteredException(entity);
133 final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate);
134 LOG.debug("Registering candidate with message: {}", msg);
135 candidateRegistry.tell(msg);
137 return new CandidateRegistration(entity, this);
141 public DOMEntityOwnershipListenerRegistration registerListener(final String entityType,
142 final DOMEntityOwnershipListener listener) {
143 LOG.debug("Registering listener {} for type {}", listener, entityType);
144 listenerRegistry.tell(new RegisterListener(entityType, listener));
146 return new ListenerRegistration(listener, entityType, this);
150 public Optional<EntityOwnershipState> getOwnershipState(final DOMEntity entity) {
151 LOG.debug("Retrieving ownership state for {}", entity);
153 final CompletionStage<GetOwnershipStateReply> result = AskPattern.ask(ownerStateChecker,
154 replyTo -> new GetOwnershipState(entity, replyTo),
155 Duration.ofSeconds(5), scheduler);
157 final GetOwnershipStateReply reply;
159 reply = result.toCompletableFuture().get();
160 } catch (final InterruptedException | ExecutionException exception) {
161 LOG.warn("Failed to retrieve ownership state for {}", entity, exception);
162 return Optional.empty();
165 return Optional.ofNullable(reply.getOwnershipState());
169 public boolean isCandidateRegistered(final DOMEntity forEntity) {
170 return registeredEntities.contains(forEntity);
174 public ListenableFuture<Empty> activateDataCenter() {
175 LOG.debug("Activating datacenter: {}", datacenter);
177 return toListenableFuture("Activate",
178 AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
182 public ListenableFuture<Empty> deactivateDataCenter() {
183 LOG.debug("Deactivating datacenter: {}", datacenter);
184 return toListenableFuture("Deactivate",
185 AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
188 void unregisterCandidate(final DOMEntity entity) {
189 LOG.debug("Unregistering candidate for {}", entity);
191 if (registeredEntities.remove(entity)) {
192 candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate));
196 void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) {
197 LOG.debug("Unregistering listener {} for type {}", listener, entityType);
199 listenerRegistry.tell(new UnregisterListener(entityType, listener));
203 RunningContext getRunningContext() {
204 return runningContext;
207 private static ListenableFuture<Empty> toListenableFuture(final String op, final CompletionStage<?> stage) {
208 final SettableFuture<Empty> future = SettableFuture.create();
209 stage.whenComplete((reply, failure) -> {
210 if (failure != null) {
211 LOG.warn("{} DataCenter has failed", op, failure);
212 future.setException(failure);
214 LOG.debug("{} DataCenter successful", op);
215 future.set(Empty.getInstance());