Eliminate network-pcep-topology-config
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / PCEPTopologyProvider.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import io.netty.channel.Channel;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.epoll.EpollChannelOption;
20 import java.util.Arrays;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.stream.Collectors;
24 import java.util.stream.Stream;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
30 import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
31 import org.opendaylight.mdsal.binding.api.RpcProviderService;
32 import org.opendaylight.protocol.pcep.PCEPCapability;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.programming.rev181109.NetworkTopologyPcepProgrammingService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.NetworkTopologyPcepService;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
36 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
37 import org.opendaylight.yangtools.concepts.Registration;
38 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.common.Empty;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 final class PCEPTopologyProvider extends DefaultTopologyReference {
44     private static final Logger LOG = LoggerFactory.getLogger(PCEPTopologyProvider.class);
45
46     private final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier;
47     private final PCEPTopologyProviderDependencies dependencies;
48     private final InstructionScheduler scheduler;
49
50     // High-level state bits: currently running asynchronous operation, current configuration and the next configuration
51     // to apply after the async operation completes
52     @GuardedBy("this")
53     private ListenableFuture<?> asyncOperation;
54     @GuardedBy("this")
55     private PCEPTopologyConfiguration currentConfig;
56     @GuardedBy("this")
57     private Optional<PCEPTopologyConfiguration> nextConfig;
58
59     // Future indicating shutdown in progress
60     @GuardedBy("this")
61     private SettableFuture<Empty> stopFuture;
62
63     // Low-level state bits
64     @GuardedBy("this")
65     private ServerSessionManager manager;
66     @GuardedBy("this")
67     private PCEPStatefulPeerProposal proposal;
68     @GuardedBy("this")
69     private Channel channel;
70     @GuardedBy("this")
71     private Registration networkReg;
72     @GuardedBy("this")
73     private Registration elementReg;
74
75     PCEPTopologyProvider(final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier,
76             final PCEPTopologyProviderDependencies dependencies, final InstructionScheduler scheduler) {
77         super(instanceIdentifier);
78         this.instanceIdentifier = requireNonNull(instanceIdentifier);
79         this.dependencies = requireNonNull(dependencies);
80         this.scheduler = requireNonNull(scheduler);
81     }
82
83     synchronized ListenableFuture<?> stop() {
84         if (stopFuture != null) {
85             // Already stopping, just return the future
86             return stopFuture;
87         }
88
89         stopFuture = SettableFuture.create();
90         applyConfiguration(null);
91         if (asyncOperation == null) {
92             stopFuture.set(Empty.value());
93         }
94         return stopFuture;
95     }
96
97     synchronized void updateConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
98         // FIXME: BGPCEP-960: this check should be a one-time thing in PCEPTopologyTracker startup once we have OSGi DS
99         final var effectiveConfig = dependencies.getPCEPDispatcher().getPCEPSessionNegotiatorFactory()
100             .getPCEPSessionProposalFactory().getCapabilities().stream()
101             .anyMatch(PCEPCapability::isStateful) ? newConfiguration : null;
102
103         applyConfiguration(effectiveConfig);
104     }
105
106     @Holding("this")
107     private void applyConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
108         if (asyncOperation != null) {
109             LOG.debug("Topology Provider {} is undergoing reconfiguration, delaying reconfiguration", topologyId());
110             nextConfig = Optional.ofNullable(newConfiguration);
111         } else {
112             doApplyConfiguration(newConfiguration);
113         }
114     }
115
116     @Holding("this")
117     private void doApplyConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
118         LOG.debug("Topology Provider {} applying configuration {}", topologyId(), newConfiguration);
119
120         // Perform obvious enable/disable operations
121         if (newConfiguration == null) {
122             if (currentConfig != null) {
123                 LOG.info("Topology Provider {} lost configuration, disabling it", topologyId());
124                 disable();
125             }
126             return;
127         }
128         if (currentConfig == null) {
129             LOG.info("Topology Provider {} received configuration, enabling it", topologyId());
130             enable(newConfiguration);
131             return;
132         }
133
134         // We need to perform a complete restart if the listen address changes
135         final var currentAddress = currentConfig.getAddress();
136         final var newAddress = newConfiguration.getAddress();
137         if (!currentAddress.equals(newAddress)) {
138             LOG.info("Topology Provider {} listen address changed from {} to {}, restarting", topologyId(),
139                 currentAddress, newAddress);
140             applyConfiguration(null);
141             applyConfiguration(newConfiguration);
142             return;
143         }
144
145         // TCP-MD5 configuration is propagated from the server channel to individual channels. For any node that has
146         // changed this configuration we need to tear down any existing session.
147         final var currentKeys = currentConfig.getKeys().asMap();
148         final var newKeys = newConfiguration.getKeys().asMap();
149         final var outdatedNodes = Stream.concat(currentKeys.keySet().stream(), newKeys.keySet().stream())
150             .distinct()
151             .filter(nodeId -> !Arrays.equals(currentKeys.get(nodeId), newKeys.get(nodeId)))
152             .collect(Collectors.toUnmodifiableList());
153
154         manager.setRpcTimeout(newConfiguration.getRpcTimeout());
155         if (!outdatedNodes.isEmpty()) {
156             LOG.info("Topology Provider {} updating {} TCP-MD5 keys", topologyId(), outdatedNodes.size());
157             if (channel.config().setOption(EpollChannelOption.TCP_MD5SIG, newKeys)) {
158                 manager.tearDownSessions(outdatedNodes);
159             } else {
160                 LOG.warn("Topology Provider {} failed to update TCP-MD5 keys", topologyId());
161             }
162         }
163
164         currentConfig = newConfiguration;
165         LOG.info("Topology Provider {} configuration updated", topologyId());
166     }
167
168     @Holding("this")
169     private void enable(final PCEPTopologyConfiguration newConfiguration) {
170         // Assert we are performing an asynchronous operation
171         final var future = startOperation();
172         currentConfig = newConfiguration;
173
174         // First start the manager
175         manager = new ServerSessionManager(instanceIdentifier, dependencies, newConfiguration.getRpcTimeout(),
176                 newConfiguration.getGraphKey());
177         final var managerStart = manager.start();
178         managerStart.addListener(() -> enableChannel(future, Futures.getUnchecked(managerStart)),
179             MoreExecutors.directExecutor());
180     }
181
182     private synchronized void enableChannel(final SettableFuture<Empty> future, final Boolean managerSuccess) {
183         if (!managerSuccess) {
184             manager = null;
185             currentConfig = null;
186             finishOperation(future);
187             return;
188         }
189
190         proposal = new PCEPStatefulPeerProposal(dependencies.getDataBroker(), instanceIdentifier);
191
192         LOG.info("PCEP Topology Provider {} starting server channel", topologyId());
193         final var channelFuture = dependencies.getPCEPDispatcher().createServer(
194             new PCEPDispatcherDependenciesImpl(manager, proposal, currentConfig));
195         channelFuture.addListener(ignored -> enableRPCs(future, channelFuture));
196     }
197
198     private synchronized void enableRPCs(final SettableFuture<Empty> future, final ChannelFuture channelFuture) {
199         final var channelFailure = channelFuture.cause();
200         if (channelFailure != null) {
201             LOG.error("Topology Provider {} failed to initialize server channel", topologyId(), channelFailure);
202             disableManager(future);
203             return;
204         }
205         channel = channelFuture.channel();
206
207         // Register RPCs
208         final RpcProviderService rpcRegistry = dependencies.getRpcProviderRegistry();
209         elementReg = rpcRegistry.registerRpcImplementation(NetworkTopologyPcepService.class,
210             new TopologyRPCs(manager), Set.of(instanceIdentifier));
211         networkReg = rpcRegistry.registerRpcImplementation(NetworkTopologyPcepProgrammingService.class,
212             new TopologyProgramming(scheduler, manager), Set.of(instanceIdentifier));
213
214         // We are now completely initialized
215         LOG.info("PCEP Topology Provider {} enabled", topologyId());
216         finishOperation(future);
217     }
218
219     @Holding("this")
220     private void disable() {
221         // Unregister RPCs
222         if (networkReg != null) {
223             networkReg.close();
224             networkReg = null;
225         }
226         if (elementReg != null) {
227             elementReg.close();
228             elementReg = null;
229         }
230
231         // Assert we are performing an asynchronous operation
232         final var future = startOperation();
233
234         // Disable channel
235         final var channelFuture = channel.close();
236         channel = null;
237         channelFuture.addListener(ignored -> disableManager(future));
238     }
239
240     @Holding("this")
241     private void disableManager(final SettableFuture<Empty> future) {
242         proposal.close();
243         proposal = null;
244         final var managerStop = manager.stop();
245         manager = null;
246         managerStop.addListener(() -> finishStopManager(future), MoreExecutors.directExecutor());
247     }
248
249     private synchronized void finishStopManager(final SettableFuture<Empty> future) {
250         // We are now completely shut down
251         currentConfig = null;
252         finishOperation(future);
253     }
254
255     @Holding("this")
256     private SettableFuture<Empty> startOperation() {
257         verify(asyncOperation == null, "Operation %s has not finished yet", asyncOperation);
258         final var future = SettableFuture.<Empty>create();
259         asyncOperation = future;
260         return future;
261     }
262
263     @Holding("this")
264     private void finishOperation(final SettableFuture<Empty> future) {
265         asyncOperation = null;
266         future.set(Empty.value());
267
268         // Process next configuration change if there is one
269         if (nextConfig != null) {
270             final var config = nextConfig.orElse(null);
271             nextConfig = null;
272             doApplyConfiguration(config);
273             return;
274         }
275
276         // Check if we are shutting down
277         if (stopFuture != null) {
278             stopFuture.set(Empty.value());
279         }
280     }
281
282     private @NonNull String topologyId() {
283         return TopologyUtils.friendlyId(instanceIdentifier);
284     }
285 }