28ada5623087796c3682131a5d09cc657af1c3bc
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / AkkaEntityOwnershipService.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import akka.actor.ActorSystem;
11 import akka.actor.typed.ActorRef;
12 import akka.actor.typed.Scheduler;
13 import akka.actor.typed.javadsl.Adapter;
14 import akka.actor.typed.javadsl.AskPattern;
15 import akka.actor.typed.javadsl.Behaviors;
16 import akka.cluster.typed.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.time.Duration;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import javax.annotation.PreDestroy;
27 import javax.inject.Inject;
28 import javax.inject.Singleton;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
31 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
32 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
33 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
34 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
35 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
36 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
37 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
38 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
39 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
40 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
42 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
43 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
44 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
45 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
46 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
47 import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
48 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
49 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
50 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
51 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
52 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
53 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
54 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
55 import org.osgi.service.component.annotations.Activate;
56 import org.osgi.service.component.annotations.Component;
57 import org.osgi.service.component.annotations.Deactivate;
58 import org.osgi.service.component.annotations.Reference;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data
64  * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects
65  * the appropriate owners.
66  */
67 @Singleton
68 @Component(immediate = true, service = { DOMEntityOwnershipService.class, NativeEosService.class })
69 public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable {
70     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
71     private static final String DATACENTER_PREFIX = "dc";
72     private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
73
74     private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
75     private final String localCandidate;
76     private final Scheduler scheduler;
77     private final String datacenter;
78
79     private final ActorRef<BootstrapCommand> bootstrap;
80     private final RunningContext runningContext;
81     private final ActorRef<CandidateRegistryCommand> candidateRegistry;
82     private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
83     private final ActorRef<StateCheckerCommand> ownerStateChecker;
84     private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
85
86     @VisibleForTesting
87     AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
88         final var typedActorSystem = Adapter.toTyped(actorSystem);
89         scheduler = typedActorSystem.scheduler();
90
91         final Cluster cluster = Cluster.get(typedActorSystem);
92         datacenter = cluster.selfMember().dataCenter();
93
94         localCandidate = cluster.selfMember().getRoles().stream()
95             .filter(role -> !role.contains(DATACENTER_PREFIX))
96             .findFirst()
97             .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
98
99         bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
100
101         final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
102                 GetRunningContext::new, Duration.ofSeconds(5), scheduler);
103         runningContext = ask.toCompletableFuture().get();
104
105         candidateRegistry = runningContext.getCandidateRegistry();
106         listenerRegistry = runningContext.getListenerRegistry();
107         ownerStateChecker = runningContext.getOwnerStateChecker();
108         ownerSupervisor = runningContext.getOwnerSupervisor();
109     }
110
111     @Inject
112     @Activate
113     public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider)
114             throws ExecutionException, InterruptedException {
115         this(provider.getActorSystem());
116     }
117
118     @PreDestroy
119     @Deactivate
120     @Override
121     public void close() throws InterruptedException, ExecutionException {
122         AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
123     }
124
125     @Override
126     public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity)
127             throws CandidateAlreadyRegisteredException {
128         if (!registeredEntities.add(entity)) {
129             throw new CandidateAlreadyRegisteredException(entity);
130         }
131
132         final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate);
133         LOG.debug("Registering candidate with message: {}", msg);
134         candidateRegistry.tell(msg);
135
136         return new CandidateRegistration(entity, this);
137     }
138
139     @Override
140     public DOMEntityOwnershipListenerRegistration registerListener(final String entityType,
141                                                                    final DOMEntityOwnershipListener listener) {
142         LOG.debug("Registering listener {} for type {}", listener, entityType);
143         listenerRegistry.tell(new RegisterListener(entityType, listener));
144
145         return new ListenerRegistration(listener, entityType, this);
146     }
147
148     @Override
149     public Optional<EntityOwnershipState> getOwnershipState(final DOMEntity entity) {
150         LOG.debug("Retrieving ownership state for {}", entity);
151
152         final CompletionStage<GetOwnershipStateReply> result = AskPattern.ask(ownerStateChecker,
153             replyTo -> new GetOwnershipState(entity, replyTo),
154             Duration.ofSeconds(5), scheduler);
155
156         final GetOwnershipStateReply reply;
157         try {
158             reply = result.toCompletableFuture().get();
159         } catch (final InterruptedException | ExecutionException exception) {
160             LOG.warn("Failed to retrieve ownership state for {}", entity, exception);
161             return Optional.empty();
162         }
163
164         return Optional.ofNullable(reply.getOwnershipState());
165     }
166
167     @Override
168     public boolean isCandidateRegistered(final DOMEntity forEntity) {
169         return registeredEntities.contains(forEntity);
170     }
171
172     @Override
173     public ListenableFuture<Void> activateDataCenter() {
174         LOG.debug("Activating datacenter: {}", datacenter);
175         final SettableFuture<Void> future = SettableFuture.create();
176         final CompletionStage<OwnerSupervisorReply> ask =
177                 AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler);
178
179         ask.whenComplete((reply, failure) -> {
180             if (failure != null) {
181                 LOG.warn("Activate DataCenter has failed.", failure);
182                 future.setException(failure);
183                 return;
184             }
185
186             LOG.debug("Activate DataCenter successful.");
187             future.set(null);
188         });
189
190         return future;
191     }
192
193     @Override
194     public ListenableFuture<Void> deactivateDataCenter() {
195         LOG.debug("Deactivating datacenter: {}", datacenter);
196         final SettableFuture<Void> future = SettableFuture.create();
197         final CompletionStage<OwnerSupervisorReply> ask =
198                 AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler);
199
200         ask.whenComplete((reply, failure) -> {
201             if (failure != null) {
202                 LOG.warn("Deactivate DataCenter has failed.", failure);
203                 future.setException(failure);
204                 return;
205             }
206
207             LOG.debug("Deactivate DataCenter successful.");
208             future.set(null);
209         });
210
211         return future;
212     }
213
214     void unregisterCandidate(final DOMEntity entity) {
215         LOG.debug("Unregistering candidate for {}", entity);
216
217         if (registeredEntities.remove(entity)) {
218             candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate));
219         }
220     }
221
222     void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) {
223         LOG.debug("Unregistering listener {} for type {}", listener, entityType);
224
225         listenerRegistry.tell(new UnregisterListener(entityType, listener));
226     }
227
228     @VisibleForTesting
229     RunningContext getRunningContext() {
230         return runningContext;
231     }
232 }