2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.pcep.topology.provider;
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12 import static org.opendaylight.bgpcep.pcep.topology.provider.TopologyUtils.friendlyId;
14 import io.netty.util.HashedWheelTimer;
15 import io.netty.util.Timeout;
16 import io.netty.util.Timer;
17 import io.netty.util.TimerTask;
18 import java.util.Collection;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.TimeUnit;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.bgpcep.pcep.server.PceServerProvider;
26 import org.opendaylight.bgpcep.programming.spi.InstructionSchedulerFactory;
27 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
30 import org.opendaylight.mdsal.binding.api.DataTreeModification;
31 import org.opendaylight.mdsal.binding.api.RpcProviderService;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
34 import org.opendaylight.protocol.pcep.PCEPDispatcher;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.TopologyTypes1;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.topology.pcep.type.TopologyPcep;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Primary entrypoint into this component. Once an instance of this class is instantiated, it will subscribe to
48 * changes to the configuration datastore. There it filters only topologies which have {@link TopologyPcep} type and for
49 * each one of those instantiates a cluster-wide singleton to handle lifecycle of services attached to that topology.
51 public final class PCEPTopologyTracker
52 implements PCEPTopologyProviderDependencies, ClusteredDataTreeChangeListener<TopologyPcep>, AutoCloseable {
53 private static final Logger LOG = LoggerFactory.getLogger(PCEPTopologyTracker.class);
55 // Services we are using
56 final @NonNull InstructionSchedulerFactory instructionSchedulerFactory;
57 final @NonNull ClusterSingletonServiceProvider singletonService;
58 private final @NonNull TopologySessionStatsRegistry stateRegistry;
59 private final @NonNull RpcProviderService rpcProviderRegistry;
60 private final @NonNull PceServerProvider pceServerProvider;
61 private final @NonNull PCEPDispatcher pcepDispatcher;
62 private final @NonNull DataBroker dataBroker;
64 // Timer used for RPC timeouts and session statistics scheduling
65 private final @NonNull HashedWheelTimer privateTimer = new HashedWheelTimer();
66 private final @NonNull Timer timer = new Timer() {
68 public Timeout newTimeout(final TimerTask task, final long delay, final TimeUnit unit) {
69 return privateTimer.newTimeout(task, delay, unit);
73 public Set<Timeout> stop() {
74 // Do not allow the timer to be shut down
75 throw new UnsupportedOperationException();
79 // We are reusing our monitor as the universal lock. We have to account for three distinct threads competing for
81 // 1) the typical DTCL callback thread invoking onDataTreeChanged()
82 // 2) instance cleanup thread invoking finishDestroy()
83 // 3) framework shutdown thread invoking close()
85 // We need to track not only instances which are deemed alive by the class, but also all instances for which cleanup
86 // has not finished yet, so close() can properly wait for cleanup to finish.
88 // Since close() will terminate the DTCL subscription, the synchronization between 1) and 3) is rather trivial.
90 // The interaction between DTCL and cleanup is tricky. DTCL can report rapid create/destroy/create events and
91 // cleanup is asynchronous and when the dust settles we need to end up in the corrected overall state (created or
94 // In order to achieve that without risking deadlocks, instances are tracked using a concurrent map and each
95 // 'create' edge allocates a new PCEPTopologyInstance object.
96 private final ConcurrentMap<TopologyKey, PCEPTopologySingleton> instances = new ConcurrentHashMap<>();
98 private Registration reg;
100 public PCEPTopologyTracker(final DataBroker dataBroker, final ClusterSingletonServiceProvider singletonService,
101 final RpcProviderService rpcProviderRegistry, final PCEPDispatcher pcepDispatcher,
102 final InstructionSchedulerFactory instructionSchedulerFactory,
103 final TopologySessionStatsRegistry stateRegistry, final PceServerProvider pceServerProvider) {
104 this.dataBroker = requireNonNull(dataBroker);
105 this.singletonService = requireNonNull(singletonService);
106 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
107 this.pcepDispatcher = requireNonNull(pcepDispatcher);
108 this.instructionSchedulerFactory = requireNonNull(instructionSchedulerFactory);
109 this.stateRegistry = requireNonNull(stateRegistry);
110 this.pceServerProvider = requireNonNull(pceServerProvider);
112 reg = dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
113 InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class).child(TopologyTypes.class)
114 .augmentation(TopologyTypes1.class).child(TopologyPcep.class).build()), this);
115 LOG.info("PCEP Topology tracker initialized");
119 public PCEPDispatcher getPCEPDispatcher() {
120 return pcepDispatcher;
124 public RpcProviderService getRpcProviderRegistry() {
125 return rpcProviderRegistry;
129 public DataBroker getDataBroker() {
134 public TopologySessionStatsRegistry getStateRegistry() {
135 return stateRegistry;
139 public PceServerProvider getPceServerProvider() {
140 return pceServerProvider;
144 public Timer getTimer() {
149 public synchronized void close() {
151 // Already closed, bail out
155 LOG.info("PCEP Topology tracker shutting down");
159 // First pass: destroy all tracked instances
160 instances.values().forEach(PCEPTopologySingleton::destroy);
161 // Second pass: wait for cleanup
162 instances.values().forEach(PCEPTopologySingleton::awaitCleanup);
165 final var cancelledTasks = privateTimer.stop().size();
166 if (cancelledTasks != 0) {
167 LOG.warn("Stopped timer with {} remaining tasks", cancelledTasks);
170 LOG.info("PCEP Topology tracker shut down");
174 public synchronized void onDataTreeChanged(final Collection<DataTreeModification<TopologyPcep>> changes) {
176 // Registration has been terminated, do not process any changes
180 for (var change : changes) {
181 final var root = change.getRootNode();
182 switch (root.getModificationType()) {
184 // We only care if the topology has been newly introduced, not when its details have changed
185 if (root.getDataBefore() == null) {
186 createInstance(extractTopologyKey(change));
190 destroyInstance(extractTopologyKey(change));
198 private void createInstance(final @NonNull TopologyKey topology) {
199 final var existing = instances.remove(topology);
200 final PCEPTopologySingleton instance;
201 if (existing == null) {
202 LOG.info("Creating topology instance for {}", friendlyId(topology));
203 instance = new PCEPTopologySingleton(this, topology);
205 LOG.info("Resurrecting topology instance for {}", friendlyId(topology));
206 instance = existing.resurrect();
208 instances.put(topology, instance);
211 private void destroyInstance(final @NonNull TopologyKey topology) {
212 final var existing = instances.get(topology);
213 if (existing != null) {
214 LOG.info("Destroying topology instance for {}", friendlyId(topology));
217 LOG.warn("Attempted to destroy non-existent topology instance for {}", friendlyId(topology));
221 void finishDestroy(final TopologyKey topology, final PCEPTopologySingleton instance) {
222 if (instances.remove(topology, instance)) {
223 LOG.info("Destroyed topology instance of {}", friendlyId(topology));
227 private static @NonNull TopologyKey extractTopologyKey(final DataTreeModification<?> change) {
228 final var path = change.getRootPath().getRootIdentifier();
229 return verifyNotNull(path.firstKeyOf(Topology.class), "No topology key in %s", path);