Add PCEPTopoloy{Instance,Singleton,Tracker}
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / PCEPTopologyProvider.java
index 24e83b179c607d4de2c8e6828f1e9ad953fc0f03..df4bbbb6aa61c36345971dc44c25a0824439a099 100644 (file)
  */
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verify;
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import java.util.List;
+import io.netty.channel.epoll.EpollChannelOption;
+import java.util.Arrays;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.bgpcep.pcep.topology.provider.config.PCEPTopologyConfiguration;
-import org.opendaylight.bgpcep.pcep.topology.provider.config.PCEPTopologyProviderDependencies;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
 import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
-import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.protocol.pcep.PCEPCapability;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.programming.rev181109.NetworkTopologyPcepProgrammingService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.NetworkTopologyPcepService;
-import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public final class PCEPTopologyProvider extends DefaultTopologyReference {
-    private final ServerSessionManager manager;
-    private final PCEPTopologyProviderDependencies dependenciesProvider;
-    private final PCEPTopologyConfiguration configDependencies;
+final class PCEPTopologyProvider extends DefaultTopologyReference {
+    private static final Logger LOG = LoggerFactory.getLogger(PCEPTopologyProvider.class);
+
+    private final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier;
+    private final PCEPTopologyProviderDependencies dependencies;
     private final InstructionScheduler scheduler;
 
-    private ObjectRegistration<NetworkTopologyPcepProgrammingService> network;
-    private ObjectRegistration<NetworkTopologyPcepService> element;
+    // High-level state bits: currently running asynchronous operation, current configuration and the next configuration
+    // to apply after the async operation completes
+    @GuardedBy("this")
+    private ListenableFuture<?> asyncOperation;
+    @GuardedBy("this")
+    private PCEPTopologyConfiguration currentConfig;
+    @GuardedBy("this")
+    private Optional<PCEPTopologyConfiguration> nextConfig;
+
+    // Future indicating shutdown in progress
+    @GuardedBy("this")
+    private SettableFuture<Empty> stopFuture;
+
+    // Low-level state bits
+    @GuardedBy("this")
+    private ServerSessionManager manager;
+    @GuardedBy("this")
+    private PCEPStatefulPeerProposal proposal;
+    @GuardedBy("this")
     private Channel channel;
+    @GuardedBy("this")
+    private Registration networkReg;
+    @GuardedBy("this")
+    private Registration elementReg;
 
-    private PCEPTopologyProvider(
-            final PCEPTopologyConfiguration configDependencies,
-            final PCEPTopologyProviderDependencies dependenciesProvider,
-            final ServerSessionManager manager, final InstructionScheduler scheduler) {
-        super(configDependencies.getTopology());
-        this.dependenciesProvider = requireNonNull(dependenciesProvider);
-        this.configDependencies = configDependencies;
-        this.manager = requireNonNull(manager);
+    PCEPTopologyProvider(final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier,
+            final PCEPTopologyProviderDependencies dependencies, final InstructionScheduler scheduler) {
+        super(instanceIdentifier);
+        this.instanceIdentifier = requireNonNull(instanceIdentifier);
+        this.dependencies = requireNonNull(dependencies);
         this.scheduler = requireNonNull(scheduler);
     }
 
-    public static PCEPTopologyProvider create(final PCEPTopologyProviderDependencies dependenciesProvider,
-            final InstructionScheduler scheduler, final PCEPTopologyConfiguration configDependencies) {
-        final List<PCEPCapability> capabilities = dependenciesProvider.getPCEPDispatcher()
-                .getPCEPSessionNegotiatorFactory().getPCEPSessionProposalFactory().getCapabilities();
-        if (capabilities.stream().filter(PCEPCapability::isStateful).findAny().isEmpty()) {
-            throw new IllegalStateException(
-                "Stateful capability not defined, aborting PCEP Topology Provider instantiation");
+    synchronized ListenableFuture<?> stop() {
+        if (stopFuture != null) {
+            // Already stopping, just return the future
+            return stopFuture;
+        }
+
+        stopFuture = SettableFuture.create();
+        applyConfiguration(null);
+        if (asyncOperation == null) {
+            stopFuture.set(Empty.getInstance());
+        }
+        return stopFuture;
+    }
+
+    synchronized void updateConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
+        // FIXME: BGPCEP-960: this check should be a one-time thing in PCEPTopologyTracker startup once we have OSGi DS
+        final var effectiveConfig = dependencies.getPCEPDispatcher().getPCEPSessionNegotiatorFactory()
+            .getPCEPSessionProposalFactory().getCapabilities().stream()
+            .anyMatch(PCEPCapability::isStateful) ? newConfiguration : null;
+
+        applyConfiguration(effectiveConfig);
+    }
+
+    @Holding("this")
+    private void applyConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
+        if (asyncOperation != null) {
+            LOG.debug("Topology Provider {} is undergoing reconfiguration, delaying reconfiguration", topologyId());
+            nextConfig = Optional.ofNullable(newConfiguration);
+        } else {
+            doApplyConfiguration(newConfiguration);
         }
+    }
+
+    @Holding("this")
+    private void doApplyConfiguration(final @Nullable PCEPTopologyConfiguration newConfiguration) {
+        LOG.debug("Topology Provider {} applying configuration {}", topologyId(), newConfiguration);
+
+        // Perform obvious enable/disable operations
+        if (newConfiguration == null) {
+            if (currentConfig != null) {
+                LOG.info("Topology Provider {} lost configuration, disabling it", topologyId());
+                disable();
+            }
+            return;
+        }
+        if (currentConfig == null) {
+            LOG.info("Topology Provider {} received configuration, enabling it", topologyId());
+            enable(newConfiguration);
+            return;
+        }
+
+        // We need to perform a complete restart if the listen address changes
+        final var currentAddress = currentConfig.getAddress();
+        final var newAddress = newConfiguration.getAddress();
+        if (!currentAddress.equals(newAddress)) {
+            LOG.info("Topology Provider {} listen address changed from {} to {}, restarting", topologyId(),
+                currentAddress, newAddress);
+            applyConfiguration(null);
+            applyConfiguration(newConfiguration);
+            return;
+        }
+
+        // TCP-MD5 configuration is propagated from the server channel to individual channels. For any node that has
+        // changed this configuration we need to tear down any existing session.
+        final var currentKeys = currentConfig.getKeys().asMap();
+        final var newKeys = newConfiguration.getKeys().asMap();
+        final var outdatedNodes = Stream.concat(currentKeys.keySet().stream(), newKeys.keySet().stream())
+            .distinct()
+            .filter(nodeId -> !Arrays.equals(currentKeys.get(nodeId), newKeys.get(nodeId)))
+            .collect(Collectors.toUnmodifiableList());
+
+        proposal.setSpeakerIds(newConfiguration.getSpeakerIds());
+        manager.setRpcTimeout(newConfiguration.getRpcTimeout());
+        if (!outdatedNodes.isEmpty()) {
+            LOG.info("Topology Provider {} updating {} TCP-MD5 keys", topologyId(), outdatedNodes.size());
+            if (channel.config().setOption(EpollChannelOption.TCP_MD5SIG, newKeys)) {
+                manager.tearDownSessions(outdatedNodes);
+            } else {
+                LOG.warn("Topology Provider {} failed to update TCP-MD5 keys", topologyId());
+            }
+        }
+
+        currentConfig = newConfiguration;
+        LOG.info("Topology Provider {} configuration updated", topologyId());
+    }
+
+    @Holding("this")
+    private void enable(final PCEPTopologyConfiguration newConfiguration) {
+        // Assert we are performing an asynchronous operation
+        final var future = startOperation();
+        currentConfig = newConfiguration;
 
-        return new PCEPTopologyProvider(configDependencies, dependenciesProvider,
-            new ServerSessionManager(dependenciesProvider, configDependencies), scheduler);
+        // First start the manager
+        manager = new ServerSessionManager(instanceIdentifier, dependencies, newConfiguration.getRpcTimeout());
+        final var managerStart = manager.start();
+        managerStart.addListener(() -> enableChannel(future, Futures.getUnchecked(managerStart)),
+            MoreExecutors.directExecutor());
     }
 
-    public void instantiateServiceInstance() throws ExecutionException, InterruptedException {
-        final RpcProviderService rpcRegistry = dependenciesProvider.getRpcProviderRegistry();
+    private synchronized void enableChannel(final SettableFuture<Empty> future, final Boolean managerSuccess) {
+        if (!managerSuccess) {
+            manager = null;
+            currentConfig = null;
+            finishOperation(future);
+            return;
+        }
 
-        element = requireNonNull(rpcRegistry.registerRpcImplementation(NetworkTopologyPcepService.class,
-            new TopologyRPCs(manager), Set.of(configDependencies.getTopology())));
+        proposal = new PCEPStatefulPeerProposal(dependencies.getDataBroker(), instanceIdentifier,
+            currentConfig.getSpeakerIds());
 
-        network = requireNonNull(rpcRegistry.registerRpcImplementation(NetworkTopologyPcepProgrammingService.class,
-            new TopologyProgramming(scheduler, manager), Set.of(configDependencies.getTopology())));
+        LOG.info("PCEP Topology Provider {} starting server channel", topologyId());
+        final var channelFuture = dependencies.getPCEPDispatcher().createServer(
+            new PCEPDispatcherDependenciesImpl(manager, proposal, currentConfig));
+        channelFuture.addListener(ignored -> enableRPCs(future, channelFuture));
+    }
 
-        manager.instantiateServiceInstance();
-        final ChannelFuture channelFuture = dependenciesProvider.getPCEPDispatcher()
-                .createServer(manager.getPCEPDispatcherDependencies());
-        channelFuture.get();
+    private synchronized void enableRPCs(final SettableFuture<Empty> future, final ChannelFuture channelFuture) {
+        final var channelFailure = channelFuture.cause();
+        if (channelFailure != null) {
+            LOG.error("Topology Provider {} failed to initialize server channel", topologyId(), channelFailure);
+            disableManager(future);
+            return;
+        }
         channel = channelFuture.channel();
+
+        // Register RPCs
+        final RpcProviderService rpcRegistry = dependencies.getRpcProviderRegistry();
+        elementReg = rpcRegistry.registerRpcImplementation(NetworkTopologyPcepService.class,
+            new TopologyRPCs(manager), Set.of(instanceIdentifier));
+        networkReg = rpcRegistry.registerRpcImplementation(NetworkTopologyPcepProgrammingService.class,
+            new TopologyProgramming(scheduler, manager), Set.of(instanceIdentifier));
+
+        // We are now completely initialized
+        LOG.info("PCEP Topology Provider {} enabled", topologyId());
+        finishOperation(future);
+    }
+
+    @Holding("this")
+    private void disable() {
+        // Unregister RPCs
+        if (networkReg != null) {
+            networkReg.close();
+            networkReg = null;
+        }
+        if (elementReg != null) {
+            elementReg.close();
+            elementReg = null;
+        }
+
+        // Assert we are performing an asynchronous operation
+        final var future = startOperation();
+
+        // Disable channel
+        final var channelFuture = channel.close();
+        channel = null;
+        channelFuture.addListener(ignored -> disableManager(future));
+    }
+
+    @Holding("this")
+    private void disableManager(final SettableFuture<Empty> future) {
+        proposal = null;
+        final var managerStop = manager.stop();
+        manager = null;
+        managerStop.addListener(() -> finishStopManager(future), MoreExecutors.directExecutor());
     }
 
-    public FluentFuture<? extends CommitInfo> closeServiceInstance() {
-        //FIXME return also channelClose once ListenableFuture implements wildcard
-        channel.close().addListener((ChannelFutureListener) future ->
-                checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()));
+    private synchronized void finishStopManager(final SettableFuture<Empty> future) {
+        // We are now completely shut down
+        currentConfig = null;
+        finishOperation(future);
+    }
 
-        if (network != null) {
-            network.close();
-            network = null;
+    @Holding("this")
+    private SettableFuture<Empty> startOperation() {
+        verify(asyncOperation == null, "Operation %s has not finished yet", asyncOperation);
+        final var future = SettableFuture.<Empty>create();
+        asyncOperation = future;
+        return future;
+    }
+
+    @Holding("this")
+    private void finishOperation(final SettableFuture<Empty> future) {
+        asyncOperation = null;
+        future.set(Empty.getInstance());
+
+        // Process next configuration change if there is one
+        if (nextConfig != null) {
+            final var config = nextConfig.orElse(null);
+            nextConfig = null;
+            doApplyConfiguration(config);
+            return;
         }
-        if (element != null) {
-            element.close();
-            element = null;
+
+        // Check if we are shutting down
+        if (stopFuture != null) {
+            stopFuture.set(Empty.getInstance());
         }
-        return manager.closeServiceInstance();
+    }
+
+    private @NonNull String topologyId() {
+        return TopologyUtils.friendlyId(instanceIdentifier);
     }
 }
\ No newline at end of file