2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.pcep.topology.provider;
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import io.netty.channel.Channel;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.epoll.EpollChannelOption;
20 import java.util.Arrays;
21 import java.util.Optional;
23 import java.util.stream.Collectors;
24 import java.util.stream.Stream;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
30 import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
31 import org.opendaylight.mdsal.binding.api.RpcProviderService;
32 import org.opendaylight.protocol.pcep.PCEPCapability;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.programming.rev181109.NetworkTopologyPcepProgrammingService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.NetworkTopologyPcepService;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
36 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
37 import org.opendaylight.yangtools.concepts.Registration;
38 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.common.Empty;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 final class PCEPTopologyProvider extends DefaultTopologyReference {
44 private static final Logger LOG = LoggerFactory.getLogger(PCEPTopologyProvider.class);
46 private final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier;
47 private final PCEPTopologyProviderDependencies dependencies;
48 private final InstructionScheduler scheduler;
50 // High-level state bits: currently running asynchronous operation, current configuration and the next configuration
51 // to apply after the async operation completes
53 private ListenableFuture<?> asyncOperation;
55 private PCEPTopologyConfiguration currentConfig;
57 private Optional<PCEPTopologyConfiguration> nextConfig;
59 // Future indicating shutdown in progress
61 private SettableFuture<Empty> stopFuture;
63 // Low-level state bits
65 private ServerSessionManager manager;
67 private PCEPStatefulPeerProposal proposal;
69 private Channel channel;
71 private Registration networkReg;
73 private Registration elementReg;
75 PCEPTopologyProvider(final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier,
76 final PCEPTopologyProviderDependencies dependencies, final InstructionScheduler scheduler) {
77 super(instanceIdentifier);
78 this.instanceIdentifier = requireNonNull(instanceIdentifier);
79 this.dependencies = requireNonNull(dependencies);
80 this.scheduler = requireNonNull(scheduler);
83 synchronized ListenableFuture<?> stop() {
84 if (stopFuture != null) {
85 // Already stopping, just return the future
89 stopFuture = SettableFuture.create();
90 applyConfiguration(null);
91 if (asyncOperation == null) {
92 stopFuture.set(Empty.value());
97 synchronized void updateConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
98 // FIXME: BGPCEP-960: this check should be a one-time thing in PCEPTopologyTracker startup once we have OSGi DS
99 final var effectiveConfig = dependencies.getPCEPDispatcher().getPCEPSessionNegotiatorFactory()
100 .getPCEPSessionProposalFactory().getCapabilities().stream()
101 .anyMatch(PCEPCapability::isStateful) ? newConfiguration : null;
103 applyConfiguration(effectiveConfig);
107 private void applyConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
108 if (asyncOperation != null) {
109 LOG.debug("Topology Provider {} is undergoing reconfiguration, delaying reconfiguration", topologyId());
110 nextConfig = Optional.ofNullable(newConfiguration);
112 doApplyConfiguration(newConfiguration);
117 private void doApplyConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
118 LOG.debug("Topology Provider {} applying configuration {}", topologyId(), newConfiguration);
120 // Perform obvious enable/disable operations
121 if (newConfiguration == null) {
122 if (currentConfig != null) {
123 LOG.info("Topology Provider {} lost configuration, disabling it", topologyId());
128 if (currentConfig == null) {
129 LOG.info("Topology Provider {} received configuration, enabling it", topologyId());
130 enable(newConfiguration);
134 // We need to perform a complete restart if the listen address changes
135 final var currentAddress = currentConfig.getAddress();
136 final var newAddress = newConfiguration.getAddress();
137 if (!currentAddress.equals(newAddress)) {
138 LOG.info("Topology Provider {} listen address changed from {} to {}, restarting", topologyId(),
139 currentAddress, newAddress);
140 applyConfiguration(null);
141 applyConfiguration(newConfiguration);
145 // TCP-MD5 configuration is propagated from the server channel to individual channels. For any node that has
146 // changed this configuration we need to tear down any existing session.
147 final var currentKeys = currentConfig.getKeys().asMap();
148 final var newKeys = newConfiguration.getKeys().asMap();
149 final var outdatedNodes = Stream.concat(currentKeys.keySet().stream(), newKeys.keySet().stream())
151 .filter(nodeId -> !Arrays.equals(currentKeys.get(nodeId), newKeys.get(nodeId)))
152 .collect(Collectors.toUnmodifiableList());
154 manager.setRpcTimeout(newConfiguration.getRpcTimeout());
155 if (!outdatedNodes.isEmpty()) {
156 LOG.info("Topology Provider {} updating {} TCP-MD5 keys", topologyId(), outdatedNodes.size());
157 if (channel.config().setOption(EpollChannelOption.TCP_MD5SIG, newKeys)) {
158 manager.tearDownSessions(outdatedNodes);
160 LOG.warn("Topology Provider {} failed to update TCP-MD5 keys", topologyId());
164 currentConfig = newConfiguration;
165 LOG.info("Topology Provider {} configuration updated", topologyId());
169 private void enable(final PCEPTopologyConfiguration newConfiguration) {
170 // Assert we are performing an asynchronous operation
171 final var future = startOperation();
172 currentConfig = newConfiguration;
174 // First start the manager
175 manager = new ServerSessionManager(instanceIdentifier, dependencies, newConfiguration.getRpcTimeout(),
176 newConfiguration.getGraphKey());
177 final var managerStart = manager.start();
178 managerStart.addListener(() -> enableChannel(future, Futures.getUnchecked(managerStart)),
179 MoreExecutors.directExecutor());
182 private synchronized void enableChannel(final SettableFuture<Empty> future, final Boolean managerSuccess) {
183 if (!managerSuccess) {
185 currentConfig = null;
186 finishOperation(future);
190 proposal = new PCEPStatefulPeerProposal(dependencies.getDataBroker(), instanceIdentifier);
192 LOG.info("PCEP Topology Provider {} starting server channel", topologyId());
193 final var channelFuture = dependencies.getPCEPDispatcher().createServer(
194 new PCEPDispatcherDependenciesImpl(manager, proposal, currentConfig));
195 channelFuture.addListener(ignored -> enableRPCs(future, channelFuture));
198 private synchronized void enableRPCs(final SettableFuture<Empty> future, final ChannelFuture channelFuture) {
199 final var channelFailure = channelFuture.cause();
200 if (channelFailure != null) {
201 LOG.error("Topology Provider {} failed to initialize server channel", topologyId(), channelFailure);
202 disableManager(future);
205 channel = channelFuture.channel();
208 final RpcProviderService rpcRegistry = dependencies.getRpcProviderRegistry();
209 elementReg = rpcRegistry.registerRpcImplementation(NetworkTopologyPcepService.class,
210 new TopologyRPCs(manager), Set.of(instanceIdentifier));
211 networkReg = rpcRegistry.registerRpcImplementation(NetworkTopologyPcepProgrammingService.class,
212 new TopologyProgramming(scheduler, manager), Set.of(instanceIdentifier));
214 // We are now completely initialized
215 LOG.info("PCEP Topology Provider {} enabled", topologyId());
216 finishOperation(future);
220 private void disable() {
222 if (networkReg != null) {
226 if (elementReg != null) {
231 // Assert we are performing an asynchronous operation
232 final var future = startOperation();
235 final var channelFuture = channel.close();
237 channelFuture.addListener(ignored -> disableManager(future));
241 private void disableManager(final SettableFuture<Empty> future) {
244 final var managerStop = manager.stop();
246 managerStop.addListener(() -> finishStopManager(future), MoreExecutors.directExecutor());
249 private synchronized void finishStopManager(final SettableFuture<Empty> future) {
250 // We are now completely shut down
251 currentConfig = null;
252 finishOperation(future);
256 private SettableFuture<Empty> startOperation() {
257 verify(asyncOperation == null, "Operation %s has not finished yet", asyncOperation);
258 final var future = SettableFuture.<Empty>create();
259 asyncOperation = future;
264 private void finishOperation(final SettableFuture<Empty> future) {
265 asyncOperation = null;
266 future.set(Empty.value());
268 // Process next configuration change if there is one
269 if (nextConfig != null) {
270 final var config = nextConfig.orElse(null);
272 doApplyConfiguration(config);
276 // Check if we are shutting down
277 if (stopFuture != null) {
278 stopFuture.set(Empty.value());
282 private @NonNull String topologyId() {
283 return TopologyUtils.friendlyId(instanceIdentifier);