27fbad447413b0dfd70c2cbb6fc186ce26b6cbf7
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / AkkaEntityOwnershipService.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka;
9
10 import akka.actor.ActorSystem;
11 import akka.actor.typed.ActorRef;
12 import akka.actor.typed.Scheduler;
13 import akka.actor.typed.javadsl.Adapter;
14 import akka.actor.typed.javadsl.AskPattern;
15 import akka.actor.typed.javadsl.Behaviors;
16 import akka.cluster.typed.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import java.time.Duration;
19 import java.util.Optional;
20 import java.util.Set;
21 import java.util.concurrent.CompletionStage;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutionException;
24 import javax.annotation.PreDestroy;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.opendaylight.controller.cluster.ActorSystemProvider;
28 import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
29 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
30 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
31 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
32 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
33 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
34 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
35 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
36 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
37 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
38 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
39 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
40 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
41 import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
42 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
43 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
44 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
45 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
46 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
47 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
48 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
49 import org.osgi.service.component.annotations.Activate;
50 import org.osgi.service.component.annotations.Component;
51 import org.osgi.service.component.annotations.Deactivate;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data
58  * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects
59  * the appropriate owners.
60  */
61 @Singleton
62 @Component(immediate = true, service = DOMEntityOwnershipService.class)
63 public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
64     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
65     private static final String DATACENTER_PREFIX = "dc";
66
67     private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
68     private final String localCandidate;
69     private final Scheduler scheduler;
70
71     private final ActorRef<BootstrapCommand> bootstrap;
72     private final RunningContext runningContext;
73     private final ActorRef<CandidateRegistryCommand> candidateRegistry;
74     private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
75     private final ActorRef<StateCheckerCommand> ownerStateChecker;
76
77     @VisibleForTesting
78     AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
79         final var typedActorSystem = Adapter.toTyped(actorSystem);
80
81         scheduler = typedActorSystem.scheduler();
82         localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
83             .filter(role -> !role.contains(DATACENTER_PREFIX))
84             .findFirst()
85             .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
86
87         bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
88
89         final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
90                 GetRunningContext::new, Duration.ofSeconds(5), scheduler);
91         runningContext = ask.toCompletableFuture().get();
92
93         candidateRegistry = runningContext.getCandidateRegistry();
94         listenerRegistry = runningContext.getListenerRegistry();
95         ownerStateChecker = runningContext.getOwnerStateChecker();
96     }
97
98     @Inject
99     @Activate
100     public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider)
101             throws ExecutionException, InterruptedException {
102         this(provider.getActorSystem());
103     }
104
105     @PreDestroy
106     @Deactivate
107     @Override
108     public void close() throws InterruptedException, ExecutionException {
109         AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
110     }
111
112     @Override
113     public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity)
114             throws CandidateAlreadyRegisteredException {
115         if (!registeredEntities.add(entity)) {
116             throw new CandidateAlreadyRegisteredException(entity);
117         }
118
119         final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate);
120         LOG.debug("Registering candidate with message: {}", msg);
121         candidateRegistry.tell(msg);
122
123         return new CandidateRegistration(entity, this);
124     }
125
126     @Override
127     public DOMEntityOwnershipListenerRegistration registerListener(final String entityType,
128                                                                    final DOMEntityOwnershipListener listener) {
129         LOG.debug("Registering listener {} for type {}", listener, entityType);
130         listenerRegistry.tell(new RegisterListener(entityType, listener));
131
132         return new ListenerRegistration(listener, entityType, this);
133     }
134
135     @Override
136     public Optional<EntityOwnershipState> getOwnershipState(final DOMEntity entity) {
137         LOG.debug("Retrieving ownership state for {}", entity);
138
139         final CompletionStage<GetOwnershipStateReply> result = AskPattern.ask(ownerStateChecker,
140             replyTo -> new GetOwnershipState(entity, replyTo),
141             Duration.ofSeconds(5), scheduler);
142
143         final GetOwnershipStateReply reply;
144         try {
145             reply = result.toCompletableFuture().get();
146         } catch (final InterruptedException | ExecutionException exception) {
147             LOG.warn("Failed to retrieve ownership state for {}", entity, exception);
148             return Optional.empty();
149         }
150
151         return Optional.ofNullable(reply.getOwnershipState());
152     }
153
154     @Override
155     public boolean isCandidateRegistered(final DOMEntity forEntity) {
156         return registeredEntities.contains(forEntity);
157     }
158
159     void unregisterCandidate(final DOMEntity entity) {
160         LOG.debug("Unregistering candidate for {}", entity);
161
162         if (registeredEntities.remove(entity)) {
163             candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate));
164         }
165     }
166
167     void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) {
168         LOG.debug("Unregistering listener {} for type {}", listener, entityType);
169
170         listenerRegistry.tell(new UnregisterListener(entityType, listener));
171     }
172
173     @VisibleForTesting
174     RunningContext getRunningContext() {
175         return runningContext;
176     }
177 }