2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.l2switch.loopremover.topology;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
17 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
18 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.l2switch.loopremover.util.InstanceIdentifierUtils;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatus;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnector;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnectorBuilder;
29 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
30 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
32 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.binding.DataObject;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import java.util.ArrayList;
41 import java.util.List;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.TimeUnit;
49 * Listens to data change events on topology links
50 * {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
51 * and maintains a topology graph using provided NetworkGraphService
52 * {@link org.opendaylight.l2switch.loopremover.topology.NetworkGraphService}.
53 * It refreshes the graph after a delay(default 10 sec) to accommodate burst of change events if they come in bulk.
54 * This is to avoid continuous refresh of graph on a series of change events in short time.
56 public class TopologyLinkDataChangeHandler implements DataChangeListener {
57 private static final Logger _logger = LoggerFactory.getLogger(TopologyLinkDataChangeHandler.class);
58 private static final String DEFAULT_TOPOLOGY_ID = "flow:1";
59 private static final long DEFAULT_GRAPH_REFRESH_DELAY = 1000;
61 private final ScheduledExecutorService topologyDataChangeEventProcessor = Executors.newScheduledThreadPool(1);
63 private final NetworkGraphService networkGraphService;
64 private boolean networkGraphRefreshScheduled = false;
65 private boolean threadReschedule = false;
66 private long graphRefreshDelay;
67 private String topologyId;
69 private final DataBroker dataBroker;
71 public TopologyLinkDataChangeHandler(DataBroker dataBroker, NetworkGraphService networkGraphService) {
72 Preconditions.checkNotNull(dataBroker, "dataBroker should not be null.");
73 Preconditions.checkNotNull(networkGraphService, "networkGraphService should not be null.");
74 this.dataBroker = dataBroker;
75 this.networkGraphService = networkGraphService;
78 public void setGraphRefreshDelay(long graphRefreshDelay) {
79 if (graphRefreshDelay < 0) {
80 this.graphRefreshDelay = DEFAULT_GRAPH_REFRESH_DELAY;
82 else this.graphRefreshDelay = graphRefreshDelay;
85 public void setTopologyId(String topologyId) {
86 if (topologyId == null || topologyId.isEmpty()) {
87 this.topologyId = DEFAULT_TOPOLOGY_ID;
89 else this.topologyId = topologyId;
93 * Registers as a data listener to receive changes done to
94 * {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
95 * under {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology}
96 * operation data root.
98 public ListenerRegistration<DataChangeListener> registerAsDataChangeListener() {
99 InstanceIdentifier<Link> linkInstance = InstanceIdentifier.builder(NetworkTopology.class)
100 .child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Link.class).toInstance();
101 return dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, linkInstance, this, AsyncDataBroker.DataChangeScope.BASE);
105 * Handler for onDataChanged events and schedules the building of the network graph.
106 * @param dataChangeEvent The data change event to process.
109 public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> dataChangeEvent) {
110 if(dataChangeEvent == null) {
113 Map<InstanceIdentifier<?>, DataObject> createdData = dataChangeEvent.getCreatedData();
114 Set<InstanceIdentifier<?>> removedPaths = dataChangeEvent.getRemovedPaths();
115 Map<InstanceIdentifier<?>, DataObject> originalData = dataChangeEvent.getOriginalData();
116 boolean isGraphUpdated = false;
118 if(createdData != null && !createdData.isEmpty()) {
119 Set<InstanceIdentifier<?>> linksIds = createdData.keySet();
120 for(InstanceIdentifier<?> linkId : linksIds) {
121 if(Link.class.isAssignableFrom(linkId.getTargetType())) {
122 Link link = (Link) createdData.get(linkId);
123 if(!(link.getLinkId().getValue().contains("host"))) {
124 isGraphUpdated = true;
125 _logger.debug("Graph is updated! Added Link {}", link.getLinkId().getValue());
132 if(removedPaths != null && !removedPaths.isEmpty() && originalData != null && !originalData.isEmpty()) {
133 for(InstanceIdentifier<?> instanceId : removedPaths) {
134 if(Link.class.isAssignableFrom(instanceId.getTargetType())) {
135 Link link = (Link) originalData.get(instanceId);
136 if(!(link.getLinkId().getValue().contains("host"))) {
137 isGraphUpdated = true;
138 _logger.debug("Graph is updated! Removed Link {}", link.getLinkId().getValue());
145 if(!isGraphUpdated) {
148 if(!networkGraphRefreshScheduled) {
150 if(!networkGraphRefreshScheduled) {
151 topologyDataChangeEventProcessor.schedule(new TopologyDataChangeEventProcessor(), graphRefreshDelay, TimeUnit.MILLISECONDS);
152 networkGraphRefreshScheduled = true;
153 _logger.debug("Scheduled Graph for refresh.");
157 _logger.debug("Already scheduled for network graph refresh.");
158 threadReschedule = true;
166 private class TopologyDataChangeEventProcessor implements Runnable {
170 if (threadReschedule) {
171 topologyDataChangeEventProcessor.schedule(this, graphRefreshDelay, TimeUnit.MILLISECONDS);
172 threadReschedule = false;
175 _logger.debug("In network graph refresh thread.");
176 networkGraphRefreshScheduled = false;
177 networkGraphService.clear();
178 List<Link> links = getLinksFromTopology();
179 if(links == null || links.isEmpty()) {
182 networkGraphService.addLinks(links);
183 final ReadWriteTransaction readWriteTransaction = dataBroker.newReadWriteTransaction();
184 updateNodeConnectorStatus(readWriteTransaction);
185 final CheckedFuture writeTxResultFuture = readWriteTransaction.submit();
186 Futures.addCallback(writeTxResultFuture, new FutureCallback() {
188 public void onSuccess(Object o) {
189 _logger.debug("TopologyLinkDataChangeHandler write successful for tx :{}", readWriteTransaction.getIdentifier());
193 public void onFailure(Throwable throwable) {
194 _logger.error("TopologyLinkDataChangeHandler write transaction {} failed", readWriteTransaction.getIdentifier(), throwable.getCause());
197 _logger.debug("Done with network graph refresh thread.");
200 private List<Link> getLinksFromTopology() {
201 InstanceIdentifier<Topology> topologyInstanceIdentifier = InstanceIdentifierUtils.generateTopologyInstanceIdentifier(topologyId);
202 Topology topology = null;
203 ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
205 Optional<Topology> topologyOptional = readOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, topologyInstanceIdentifier).get();
206 if(topologyOptional.isPresent()) {
207 topology = topologyOptional.get();
209 } catch(Exception e) {
210 _logger.error("Error reading topology {}", topologyInstanceIdentifier);
211 readOnlyTransaction.close();
212 throw new RuntimeException("Error reading from operational store, topology : " + topologyInstanceIdentifier, e);
214 readOnlyTransaction.close();
215 if(topology == null) {
218 List<Link> links = topology.getLink();
219 if(links == null || links.isEmpty()) {
222 List<Link> internalLinks = new ArrayList<>();
223 for(Link link : links) {
224 if(!(link.getLinkId().getValue().contains("host"))) {
225 internalLinks.add(link);
228 return internalLinks;
232 * @param readWriteTransaction
234 private void updateNodeConnectorStatus(ReadWriteTransaction readWriteTransaction) {
235 List<Link> allLinks = networkGraphService.getAllLinks();
236 if(allLinks == null || allLinks.isEmpty()) {
240 List<Link> mstLinks = networkGraphService.getLinksInMst();
241 for(Link link : allLinks) {
242 if(mstLinks != null && !mstLinks.isEmpty() && mstLinks.contains(link)) {
243 updateNodeConnector(readWriteTransaction, getSourceNodeConnectorRef(link), StpStatus.Forwarding);
244 updateNodeConnector(readWriteTransaction, getDestNodeConnectorRef(link), StpStatus.Forwarding);
246 updateNodeConnector(readWriteTransaction, getSourceNodeConnectorRef(link), StpStatus.Discarding);
247 updateNodeConnector(readWriteTransaction, getDestNodeConnectorRef(link), StpStatus.Discarding);
256 private NodeConnectorRef getSourceNodeConnectorRef(Link link) {
257 InstanceIdentifier<NodeConnector> nodeConnectorInstanceIdentifier
258 = InstanceIdentifierUtils.createNodeConnectorIdentifier(
259 link.getSource().getSourceNode().getValue(),
260 link.getSource().getSourceTp().getValue());
261 return new NodeConnectorRef(nodeConnectorInstanceIdentifier);
268 private NodeConnectorRef getDestNodeConnectorRef(Link link) {
269 InstanceIdentifier<NodeConnector> nodeConnectorInstanceIdentifier
270 = InstanceIdentifierUtils.createNodeConnectorIdentifier(
271 link.getDestination().getDestNode().getValue(),
272 link.getDestination().getDestTp().getValue());
274 return new NodeConnectorRef(nodeConnectorInstanceIdentifier);
278 * @param readWriteTransaction
279 * @param nodeConnectorRef
282 private void updateNodeConnector(ReadWriteTransaction readWriteTransaction, NodeConnectorRef nodeConnectorRef, StpStatus stpStatus) {
283 StpStatusAwareNodeConnectorBuilder stpStatusAwareNodeConnectorBuilder = new StpStatusAwareNodeConnectorBuilder()
284 .setStatus(stpStatus);
285 checkIfExistAndUpdateNodeConnector(readWriteTransaction, nodeConnectorRef, stpStatusAwareNodeConnectorBuilder.build());
289 * @param readWriteTransaction
290 * @param nodeConnectorRef
291 * @param stpStatusAwareNodeConnector
293 private void checkIfExistAndUpdateNodeConnector(ReadWriteTransaction readWriteTransaction, NodeConnectorRef nodeConnectorRef, StpStatusAwareNodeConnector stpStatusAwareNodeConnector) {
294 NodeConnector nc = null;
296 Optional<NodeConnector> dataObjectOptional = readWriteTransaction.read(LogicalDatastoreType.OPERATIONAL, (InstanceIdentifier<NodeConnector>) nodeConnectorRef.getValue()).get();
297 if(dataObjectOptional.isPresent())
298 nc = (NodeConnector) dataObjectOptional.get();
299 } catch(Exception e) {
300 _logger.error("Error reading node connector {}", nodeConnectorRef.getValue());
301 readWriteTransaction.submit();
302 throw new RuntimeException("Error reading from operational store, node connector : " + nodeConnectorRef, e);
304 NodeConnectorBuilder nodeConnectorBuilder;
306 if(sameStatusPresent(nc.getAugmentation(StpStatusAwareNodeConnector.class), stpStatusAwareNodeConnector.getStatus())) {
310 //build instance id for StpStatusAwareNodeConnector
311 InstanceIdentifier<StpStatusAwareNodeConnector> stpStatusAwareNcInstanceId =
312 ((InstanceIdentifier<NodeConnector>) nodeConnectorRef.getValue())
313 .augmentation(StpStatusAwareNodeConnector.class);
314 //update StpStatusAwareNodeConnector in operational store
315 readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, stpStatusAwareNcInstanceId, stpStatusAwareNodeConnector);
316 _logger.debug("Merged Stp Status aware node connector in operational {} with status {}", stpStatusAwareNcInstanceId, stpStatusAwareNodeConnector);
318 _logger.error("Unable to update Stp Status node connector {} note present in operational store", nodeConnectorRef.getValue());
323 * @param stpStatusAwareNodeConnector
326 private boolean sameStatusPresent(StpStatusAwareNodeConnector stpStatusAwareNodeConnector, StpStatus stpStatus) {
328 if(stpStatusAwareNodeConnector == null)
331 if(stpStatusAwareNodeConnector.getStatus() == null)
334 if(stpStatus.getIntValue() != stpStatusAwareNodeConnector.getStatus().getIntValue())