2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.l2switch.loopremover.topology;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
25 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
26 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.l2switch.loopremover.util.InstanceIdentifierUtils;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatus;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnector;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnectorBuilder;
36 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Listens to data change events on topology links {@link
48 * org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
49 * and maintains a topology graph using provided NetworkGraphService
50 * {@link org.opendaylight.l2switch.loopremover.topology.NetworkGraphService}.
51 * It refreshes the graph after a delay(default 10 sec) to accommodate burst of
52 * change events if they come in bulk. This is to avoid continuous refresh of
53 * graph on a series of change events in short time.
55 public class TopologyLinkDataChangeHandler implements DataTreeChangeListener<Link> {
56 private static final Logger LOG = LoggerFactory.getLogger(TopologyLinkDataChangeHandler.class);
57 private static final String DEFAULT_TOPOLOGY_ID = "flow:1";
58 private static final long DEFAULT_GRAPH_REFRESH_DELAY = 1000;
60 private final ScheduledExecutorService topologyDataChangeEventProcessor = Executors.newScheduledThreadPool(1);
62 private final NetworkGraphService networkGraphService;
63 private volatile boolean networkGraphRefreshScheduled = false;
64 private volatile boolean threadReschedule = false;
65 private long graphRefreshDelay;
66 private String topologyId;
68 private final DataBroker dataBroker;
70 public TopologyLinkDataChangeHandler(DataBroker dataBroker, NetworkGraphService networkGraphService) {
71 Preconditions.checkNotNull(dataBroker, "dataBroker should not be null.");
72 Preconditions.checkNotNull(networkGraphService, "networkGraphService should not be null.");
73 this.dataBroker = dataBroker;
74 this.networkGraphService = networkGraphService;
77 public void setGraphRefreshDelay(long graphRefreshDelay) {
78 if (graphRefreshDelay < 0) {
79 this.graphRefreshDelay = DEFAULT_GRAPH_REFRESH_DELAY;
81 this.graphRefreshDelay = graphRefreshDelay;
85 public void setTopologyId(String topologyId) {
86 if (topologyId == null || topologyId.isEmpty()) {
87 this.topologyId = DEFAULT_TOPOLOGY_ID;
89 this.topologyId = topologyId;
94 * Registers as a data listener to receive changes done to {@link org.opendaylight.yang.gen.v1.urn.tbd.params
95 * .xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
97 * {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology}
98 * operation data root.
100 public ListenerRegistration<TopologyLinkDataChangeHandler> registerAsDataChangeListener() {
101 InstanceIdentifier<Link> linkInstance = InstanceIdentifier.builder(NetworkTopology.class)
102 .child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Link.class).build();
103 return dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(
104 LogicalDatastoreType.OPERATIONAL, linkInstance), this);
108 * Handler for onDataChanged events and schedules the building of the
112 public void onDataTreeChanged(Collection<DataTreeModification<Link>> changes) {
113 boolean isGraphUpdated = false;
115 for (DataTreeModification<Link> change: changes) {
116 DataObjectModification<Link> rootNode = change.getRootNode();
117 switch (rootNode.getModificationType()) {
119 Link createdLink = rootNode.getDataAfter();
120 if (rootNode.getDataBefore() == null && !createdLink.getLinkId().getValue().contains("host")) {
121 isGraphUpdated = true;
122 LOG.debug("Graph is updated! Added Link {}", createdLink.getLinkId().getValue());
126 Link deletedLink = rootNode.getDataBefore();
127 if (!deletedLink.getLinkId().getValue().contains("host")) {
128 isGraphUpdated = true;
129 LOG.debug("Graph is updated! Removed Link {}", deletedLink.getLinkId().getValue());
138 if (!isGraphUpdated) {
141 if (!networkGraphRefreshScheduled) {
142 synchronized (this) {
143 if (!networkGraphRefreshScheduled) {
144 topologyDataChangeEventProcessor.schedule(new TopologyDataChangeEventProcessor(), graphRefreshDelay,
145 TimeUnit.MILLISECONDS);
146 networkGraphRefreshScheduled = true;
147 LOG.debug("Scheduled Graph for refresh.");
151 LOG.debug("Already scheduled for network graph refresh.");
152 threadReschedule = true;
156 private class TopologyDataChangeEventProcessor implements Runnable {
160 if (threadReschedule) {
161 topologyDataChangeEventProcessor.schedule(this, graphRefreshDelay, TimeUnit.MILLISECONDS);
162 threadReschedule = false;
165 LOG.debug("In network graph refresh thread.");
166 networkGraphRefreshScheduled = false;
167 networkGraphService.clear();
168 List<Link> links = getLinksFromTopology();
169 if (links == null || links.isEmpty()) {
172 networkGraphService.addLinks(links);
173 final ReadWriteTransaction readWriteTransaction = dataBroker.newReadWriteTransaction();
174 updateNodeConnectorStatus(readWriteTransaction);
175 Futures.addCallback(readWriteTransaction.submit(), new FutureCallback<Void>() {
177 public void onSuccess(Void notUsed) {
178 LOG.debug("TopologyLinkDataChangeHandler write successful for tx :{}",
179 readWriteTransaction.getIdentifier());
183 public void onFailure(Throwable throwable) {
184 LOG.error("TopologyLinkDataChangeHandler write transaction {} failed",
185 readWriteTransaction.getIdentifier(), throwable.getCause());
187 }, MoreExecutors.directExecutor());
188 LOG.debug("Done with network graph refresh thread.");
191 private List<Link> getLinksFromTopology() {
192 InstanceIdentifier<Topology> topologyInstanceIdentifier = InstanceIdentifierUtils
193 .generateTopologyInstanceIdentifier(topologyId);
194 Topology topology = null;
195 ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
197 Optional<Topology> topologyOptional = readOnlyTransaction
198 .read(LogicalDatastoreType.OPERATIONAL, topologyInstanceIdentifier).get();
199 if (topologyOptional.isPresent()) {
200 topology = topologyOptional.get();
202 } catch (InterruptedException | ExecutionException e) {
203 LOG.error("Error reading topology {}", topologyInstanceIdentifier);
204 readOnlyTransaction.close();
205 throw new RuntimeException(
206 "Error reading from operational store, topology : " + topologyInstanceIdentifier, e);
208 readOnlyTransaction.close();
209 if (topology == null) {
212 List<Link> links = topology.getLink();
213 if (links == null || links.isEmpty()) {
216 List<Link> internalLinks = new ArrayList<>();
217 for (Link link : links) {
218 if (!link.getLinkId().getValue().contains("host")) {
219 internalLinks.add(link);
222 return internalLinks;
225 private void updateNodeConnectorStatus(ReadWriteTransaction readWriteTransaction) {
226 List<Link> allLinks = networkGraphService.getAllLinks();
227 if (allLinks == null || allLinks.isEmpty()) {
231 List<Link> mstLinks = networkGraphService.getLinksInMst();
232 for (Link link : allLinks) {
233 if (mstLinks != null && !mstLinks.isEmpty() && mstLinks.contains(link)) {
234 updateNodeConnector(readWriteTransaction, getSourceNodeConnectorRef(link), StpStatus.Forwarding);
235 updateNodeConnector(readWriteTransaction, getDestNodeConnectorRef(link), StpStatus.Forwarding);
237 updateNodeConnector(readWriteTransaction, getSourceNodeConnectorRef(link), StpStatus.Discarding);
238 updateNodeConnector(readWriteTransaction, getDestNodeConnectorRef(link), StpStatus.Discarding);
243 private NodeConnectorRef getSourceNodeConnectorRef(Link link) {
244 InstanceIdentifier<NodeConnector> nodeConnectorInstanceIdentifier = InstanceIdentifierUtils
245 .createNodeConnectorIdentifier(link.getSource().getSourceNode().getValue(),
246 link.getSource().getSourceTp().getValue());
247 return new NodeConnectorRef(nodeConnectorInstanceIdentifier);
250 private NodeConnectorRef getDestNodeConnectorRef(Link link) {
251 InstanceIdentifier<NodeConnector> nodeConnectorInstanceIdentifier = InstanceIdentifierUtils
252 .createNodeConnectorIdentifier(link.getDestination().getDestNode().getValue(),
253 link.getDestination().getDestTp().getValue());
255 return new NodeConnectorRef(nodeConnectorInstanceIdentifier);
258 private void updateNodeConnector(ReadWriteTransaction readWriteTransaction, NodeConnectorRef nodeConnectorRef,
259 StpStatus stpStatus) {
260 StpStatusAwareNodeConnectorBuilder stpStatusAwareNodeConnectorBuilder =
261 new StpStatusAwareNodeConnectorBuilder().setStatus(stpStatus);
262 checkIfExistAndUpdateNodeConnector(readWriteTransaction, nodeConnectorRef,
263 stpStatusAwareNodeConnectorBuilder.build());
266 private void checkIfExistAndUpdateNodeConnector(ReadWriteTransaction readWriteTransaction,
267 NodeConnectorRef nodeConnectorRef, StpStatusAwareNodeConnector stpStatusAwareNodeConnector) {
268 NodeConnector nc = null;
270 Optional<NodeConnector> dataObjectOptional = readWriteTransaction.read(LogicalDatastoreType.OPERATIONAL,
271 (InstanceIdentifier<NodeConnector>) nodeConnectorRef.getValue()).get();
272 if (dataObjectOptional.isPresent()) {
273 nc = dataObjectOptional.get();
275 } catch (InterruptedException | ExecutionException e) {
276 LOG.error("Error reading node connector {}", nodeConnectorRef.getValue());
277 readWriteTransaction.submit();
278 throw new RuntimeException("Error reading from operational store, node connector : " + nodeConnectorRef,
283 if (sameStatusPresent(nc.augmentation(StpStatusAwareNodeConnector.class),
284 stpStatusAwareNodeConnector.getStatus())) {
288 // build instance id for StpStatusAwareNodeConnector
289 InstanceIdentifier<StpStatusAwareNodeConnector> stpStatusAwareNcInstanceId =
290 ((InstanceIdentifier<NodeConnector>) nodeConnectorRef
291 .getValue()).augmentation(StpStatusAwareNodeConnector.class);
292 // update StpStatusAwareNodeConnector in operational store
293 readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, stpStatusAwareNcInstanceId,
294 stpStatusAwareNodeConnector);
295 LOG.debug("Merged Stp Status aware node connector in operational {} with status {}",
296 stpStatusAwareNcInstanceId, stpStatusAwareNodeConnector);
298 LOG.error("Unable to update Stp Status node connector {} note present in operational store",
299 nodeConnectorRef.getValue());
303 private boolean sameStatusPresent(StpStatusAwareNodeConnector stpStatusAwareNodeConnector,
304 StpStatus stpStatus) {
306 if (stpStatusAwareNodeConnector == null) {
310 if (stpStatusAwareNodeConnector.getStatus() == null) {
314 if (stpStatus.getIntValue() != stpStatusAwareNodeConnector.getStatus().getIntValue()) {