*/
package org.opendaylight.bgpcep.pcep.topology.provider;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
-import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import java.util.List;
+import io.netty.channel.epoll.EpollChannelOption;
+import java.util.Arrays;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.bgpcep.pcep.topology.provider.config.PCEPTopologyConfiguration;
-import org.opendaylight.bgpcep.pcep.topology.provider.config.PCEPTopologyProviderDependencies;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
-import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.protocol.pcep.PCEPCapability;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.programming.rev181109.NetworkTopologyPcepProgrammingService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.NetworkTopologyPcepService;
-import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public final class PCEPTopologyProvider extends DefaultTopologyReference {
- private final ServerSessionManager manager;
- private final PCEPTopologyProviderDependencies dependenciesProvider;
- private final PCEPTopologyConfiguration configDependencies;
+final class PCEPTopologyProvider extends DefaultTopologyReference {
+ private static final Logger LOG = LoggerFactory.getLogger(PCEPTopologyProvider.class);
+
+ private final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier;
+ private final PCEPTopologyProviderDependencies dependencies;
private final InstructionScheduler scheduler;
- private ObjectRegistration<NetworkTopologyPcepProgrammingService> network;
- private ObjectRegistration<NetworkTopologyPcepService> element;
+ // High-level state bits: currently running asynchronous operation, current configuration and the next configuration
+ // to apply after the async operation completes
+ @GuardedBy("this")
+ private ListenableFuture<?> asyncOperation;
+ @GuardedBy("this")
+ private PCEPTopologyConfiguration currentConfig;
+ @GuardedBy("this")
+ private Optional<PCEPTopologyConfiguration> nextConfig;
+
+ // Future indicating shutdown in progress
+ @GuardedBy("this")
+ private SettableFuture<Empty> stopFuture;
+
+ // Low-level state bits
+ @GuardedBy("this")
+ private ServerSessionManager manager;
+ @GuardedBy("this")
+ private PCEPStatefulPeerProposal proposal;
+ @GuardedBy("this")
private Channel channel;
+ @GuardedBy("this")
+ private Registration networkReg;
+ @GuardedBy("this")
+ private Registration elementReg;
- private PCEPTopologyProvider(
- final PCEPTopologyConfiguration configDependencies,
- final PCEPTopologyProviderDependencies dependenciesProvider,
- final ServerSessionManager manager, final InstructionScheduler scheduler) {
- super(configDependencies.getTopology());
- this.dependenciesProvider = requireNonNull(dependenciesProvider);
- this.configDependencies = configDependencies;
- this.manager = requireNonNull(manager);
+ PCEPTopologyProvider(final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier,
+ final PCEPTopologyProviderDependencies dependencies, final InstructionScheduler scheduler) {
+ super(instanceIdentifier);
+ this.instanceIdentifier = requireNonNull(instanceIdentifier);
+ this.dependencies = requireNonNull(dependencies);
this.scheduler = requireNonNull(scheduler);
}
- public static PCEPTopologyProvider create(final PCEPTopologyProviderDependencies dependenciesProvider,
- final InstructionScheduler scheduler, final PCEPTopologyConfiguration configDependencies) {
- final List<PCEPCapability> capabilities = dependenciesProvider.getPCEPDispatcher()
- .getPCEPSessionNegotiatorFactory().getPCEPSessionProposalFactory().getCapabilities();
- if (capabilities.stream().filter(PCEPCapability::isStateful).findAny().isEmpty()) {
- throw new IllegalStateException(
- "Stateful capability not defined, aborting PCEP Topology Provider instantiation");
+ synchronized ListenableFuture<?> stop() {
+ if (stopFuture != null) {
+ // Already stopping, just return the future
+ return stopFuture;
+ }
+
+ stopFuture = SettableFuture.create();
+ applyConfiguration(null);
+ if (asyncOperation == null) {
+ stopFuture.set(Empty.getInstance());
+ }
+ return stopFuture;
+ }
+
+ synchronized void updateConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
+ // FIXME: BGPCEP-960: this check should be a one-time thing in PCEPTopologyTracker startup once we have OSGi DS
+ final var effectiveConfig = dependencies.getPCEPDispatcher().getPCEPSessionNegotiatorFactory()
+ .getPCEPSessionProposalFactory().getCapabilities().stream()
+ .anyMatch(PCEPCapability::isStateful) ? newConfiguration : null;
+
+ applyConfiguration(effectiveConfig);
+ }
+
+ @Holding("this")
+ private void applyConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
+ if (asyncOperation != null) {
+ LOG.debug("Topology Provider {} is undergoing reconfiguration, delaying reconfiguration", topologyId());
+ nextConfig = Optional.ofNullable(newConfiguration);
+ } else {
+ doApplyConfiguration(newConfiguration);
}
+ }
+
+ @Holding("this")
+ private void doApplyConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
+ LOG.debug("Topology Provider {} applying configuration {}", topologyId(), newConfiguration);
+
+ // Perform obvious enable/disable operations
+ if (newConfiguration == null) {
+ if (currentConfig != null) {
+ LOG.info("Topology Provider {} lost configuration, disabling it", topologyId());
+ disable();
+ }
+ return;
+ }
+ if (currentConfig == null) {
+ LOG.info("Topology Provider {} received configuration, enabling it", topologyId());
+ enable(newConfiguration);
+ return;
+ }
+
+ // We need to perform a complete restart if the listen address changes
+ final var currentAddress = currentConfig.getAddress();
+ final var newAddress = newConfiguration.getAddress();
+ if (!currentAddress.equals(newAddress)) {
+ LOG.info("Topology Provider {} listen address changed from {} to {}, restarting", topologyId(),
+ currentAddress, newAddress);
+ applyConfiguration(null);
+ applyConfiguration(newConfiguration);
+ return;
+ }
+
+ // TCP-MD5 configuration is propagated from the server channel to individual channels. For any node that has
+ // changed this configuration we need to tear down any existing session.
+ final var currentKeys = currentConfig.getKeys().asMap();
+ final var newKeys = newConfiguration.getKeys().asMap();
+ final var outdatedNodes = Stream.concat(currentKeys.keySet().stream(), newKeys.keySet().stream())
+ .distinct()
+ .filter(nodeId -> !Arrays.equals(currentKeys.get(nodeId), newKeys.get(nodeId)))
+ .collect(Collectors.toUnmodifiableList());
+
+ proposal.setSpeakerIds(newConfiguration.getSpeakerIds());
+ manager.setRpcTimeout(newConfiguration.getRpcTimeout());
+ if (!outdatedNodes.isEmpty()) {
+ LOG.info("Topology Provider {} updating {} TCP-MD5 keys", topologyId(), outdatedNodes.size());
+ if (channel.config().setOption(EpollChannelOption.TCP_MD5SIG, newKeys)) {
+ manager.tearDownSessions(outdatedNodes);
+ } else {
+ LOG.warn("Topology Provider {} failed to update TCP-MD5 keys", topologyId());
+ }
+ }
+
+ currentConfig = newConfiguration;
+ LOG.info("Topology Provider {} configuration updated", topologyId());
+ }
+
+ @Holding("this")
+ private void enable(final PCEPTopologyConfiguration newConfiguration) {
+ // Assert we are performing an asynchronous operation
+ final var future = startOperation();
+ currentConfig = newConfiguration;
- return new PCEPTopologyProvider(configDependencies, dependenciesProvider,
- new ServerSessionManager(dependenciesProvider, configDependencies), scheduler);
+ // First start the manager
+ manager = new ServerSessionManager(instanceIdentifier, dependencies, newConfiguration.getRpcTimeout());
+ final var managerStart = manager.start();
+ managerStart.addListener(() -> enableChannel(future, Futures.getUnchecked(managerStart)),
+ MoreExecutors.directExecutor());
}
- public void instantiateServiceInstance() throws ExecutionException, InterruptedException {
- final RpcProviderService rpcRegistry = dependenciesProvider.getRpcProviderRegistry();
+ private synchronized void enableChannel(final SettableFuture<Empty> future, final Boolean managerSuccess) {
+ if (!managerSuccess) {
+ manager = null;
+ currentConfig = null;
+ finishOperation(future);
+ return;
+ }
- element = requireNonNull(rpcRegistry.registerRpcImplementation(NetworkTopologyPcepService.class,
- new TopologyRPCs(manager), Set.of(configDependencies.getTopology())));
+ proposal = new PCEPStatefulPeerProposal(dependencies.getDataBroker(), instanceIdentifier,
+ currentConfig.getSpeakerIds());
- network = requireNonNull(rpcRegistry.registerRpcImplementation(NetworkTopologyPcepProgrammingService.class,
- new TopologyProgramming(scheduler, manager), Set.of(configDependencies.getTopology())));
+ LOG.info("PCEP Topology Provider {} starting server channel", topologyId());
+ final var channelFuture = dependencies.getPCEPDispatcher().createServer(
+ new PCEPDispatcherDependenciesImpl(manager, proposal, currentConfig));
+ channelFuture.addListener(ignored -> enableRPCs(future, channelFuture));
+ }
- manager.instantiateServiceInstance();
- final ChannelFuture channelFuture = dependenciesProvider.getPCEPDispatcher()
- .createServer(manager.getPCEPDispatcherDependencies());
- channelFuture.get();
+ private synchronized void enableRPCs(final SettableFuture<Empty> future, final ChannelFuture channelFuture) {
+ final var channelFailure = channelFuture.cause();
+ if (channelFailure != null) {
+ LOG.error("Topology Provider {} failed to initialize server channel", topologyId(), channelFailure);
+ disableManager(future);
+ return;
+ }
channel = channelFuture.channel();
+
+ // Register RPCs
+ final RpcProviderService rpcRegistry = dependencies.getRpcProviderRegistry();
+ elementReg = rpcRegistry.registerRpcImplementation(NetworkTopologyPcepService.class,
+ new TopologyRPCs(manager), Set.of(instanceIdentifier));
+ networkReg = rpcRegistry.registerRpcImplementation(NetworkTopologyPcepProgrammingService.class,
+ new TopologyProgramming(scheduler, manager), Set.of(instanceIdentifier));
+
+ // We are now completely initialized
+ LOG.info("PCEP Topology Provider {} enabled", topologyId());
+ finishOperation(future);
+ }
+
+ @Holding("this")
+ private void disable() {
+ // Unregister RPCs
+ if (networkReg != null) {
+ networkReg.close();
+ networkReg = null;
+ }
+ if (elementReg != null) {
+ elementReg.close();
+ elementReg = null;
+ }
+
+ // Assert we are performing an asynchronous operation
+ final var future = startOperation();
+
+ // Disable channel
+ final var channelFuture = channel.close();
+ channel = null;
+ channelFuture.addListener(ignored -> disableManager(future));
+ }
+
+ @Holding("this")
+ private void disableManager(final SettableFuture<Empty> future) {
+ proposal = null;
+ final var managerStop = manager.stop();
+ manager = null;
+ managerStop.addListener(() -> finishStopManager(future), MoreExecutors.directExecutor());
}
- public FluentFuture<? extends CommitInfo> closeServiceInstance() {
- //FIXME return also channelClose once ListenableFuture implements wildcard
- channel.close().addListener((ChannelFutureListener) future ->
- checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()));
+ private synchronized void finishStopManager(final SettableFuture<Empty> future) {
+ // We are now completely shut down
+ currentConfig = null;
+ finishOperation(future);
+ }
- if (network != null) {
- network.close();
- network = null;
+ @Holding("this")
+ private SettableFuture<Empty> startOperation() {
+ verify(asyncOperation == null, "Operation %s has not finished yet", asyncOperation);
+ final var future = SettableFuture.<Empty>create();
+ asyncOperation = future;
+ return future;
+ }
+
+ @Holding("this")
+ private void finishOperation(final SettableFuture<Empty> future) {
+ asyncOperation = null;
+ future.set(Empty.getInstance());
+
+ // Process next configuration change if there is one
+ if (nextConfig != null) {
+ final var config = nextConfig.orElse(null);
+ nextConfig = null;
+ doApplyConfiguration(config);
+ return;
}
- if (element != null) {
- element.close();
- element = null;
+
+ // Check if we are shutting down
+ if (stopFuture != null) {
+ stopFuture.set(Empty.getInstance());
}
- return manager.closeServiceInstance();
+ }
+
+ private @NonNull String topologyId() {
+ return TopologyUtils.friendlyId(instanceIdentifier);
}
}
\ No newline at end of file