Bug2121 -- Add configuration parameter for TopologyId.
[l2switch.git] / loopremover / implementation / src / main / java / org / opendaylight / l2switch / loopremover / topology / TopologyLinkDataChangeHandler.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.l2switch.loopremover.topology;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
17 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
18 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.l2switch.loopremover.util.InstanceIdentifierUtils;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatus;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnector;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnectorBuilder;
29 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
30 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
32 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.binding.DataObject;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import java.util.ArrayList;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.TimeUnit;
47
48 /**
49  * Listens to data change events on topology links
50  * {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
51  * and maintains a topology graph using provided NetworkGraphService
52  * {@link org.opendaylight.l2switch.loopremover.topology.NetworkGraphService}.
53  * It refreshes the graph after a delay(default 10 sec) to accommodate burst of change events if they come in bulk.
54  * This is to avoid continuous refresh of graph on a series of change events in short time.
55  */
56 public class TopologyLinkDataChangeHandler implements DataChangeListener {
57   private static final Logger _logger = LoggerFactory.getLogger(TopologyLinkDataChangeHandler.class);
58   private static final String DEFAULT_TOPOLOGY_ID = "flow:1";
59   private static final long DEFAULT_GRAPH_REFRESH_DELAY = 1000;
60
61   private final ScheduledExecutorService topologyDataChangeEventProcessor = Executors.newScheduledThreadPool(1);
62
63   private final NetworkGraphService networkGraphService;
64   private boolean networkGraphRefreshScheduled = false;
65   private boolean threadReschedule = false;
66   private long graphRefreshDelay;
67   private String topologyId;
68
69   private final DataBroker dataBroker;
70
71   public TopologyLinkDataChangeHandler(DataBroker dataBroker, NetworkGraphService networkGraphService) {
72     Preconditions.checkNotNull(dataBroker, "dataBroker should not be null.");
73     Preconditions.checkNotNull(networkGraphService, "networkGraphService should not be null.");
74     this.dataBroker = dataBroker;
75     this.networkGraphService = networkGraphService;
76   }
77
78   public void setGraphRefreshDelay(long graphRefreshDelay) {
79     if (graphRefreshDelay < 0) {
80       this.graphRefreshDelay = DEFAULT_GRAPH_REFRESH_DELAY;
81     }
82     else this.graphRefreshDelay = graphRefreshDelay;
83   }
84
85   public void setTopologyId(String topologyId) {
86     if (topologyId == null || topologyId.isEmpty()) {
87       this.topologyId = DEFAULT_TOPOLOGY_ID;
88     }
89     else this.topologyId = topologyId;
90   }
91
92   /**
93    * Registers as a data listener to receive changes done to
94    * {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
95    * under {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology}
96    * operation data root.
97    */
98   public ListenerRegistration<DataChangeListener> registerAsDataChangeListener() {
99     InstanceIdentifier<Link> linkInstance = InstanceIdentifier.builder(NetworkTopology.class)
100         .child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Link.class).toInstance();
101     return dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, linkInstance, this, AsyncDataBroker.DataChangeScope.BASE);
102   }
103
104   /**
105    * Handler for onDataChanged events and schedules the building of the network graph.
106    * @param dataChangeEvent The data change event to process.
107    */
108   @Override
109   public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> dataChangeEvent) {
110     if(dataChangeEvent == null) {
111       return;
112     }
113     Map<InstanceIdentifier<?>, DataObject> createdData = dataChangeEvent.getCreatedData();
114     Set<InstanceIdentifier<?>> removedPaths = dataChangeEvent.getRemovedPaths();
115     Map<InstanceIdentifier<?>, DataObject> originalData = dataChangeEvent.getOriginalData();
116     boolean isGraphUpdated = false;
117
118     if(createdData != null && !createdData.isEmpty()) {
119       Set<InstanceIdentifier<?>> linksIds = createdData.keySet();
120       for(InstanceIdentifier<?> linkId : linksIds) {
121         if(Link.class.isAssignableFrom(linkId.getTargetType())) {
122           Link link = (Link) createdData.get(linkId);
123           if(!(link.getLinkId().getValue().contains("host"))) {
124             isGraphUpdated = true;
125             _logger.debug("Graph is updated! Added Link {}", link.getLinkId().getValue());
126             break;
127           }
128         }
129       }
130     }
131
132     if(removedPaths != null && !removedPaths.isEmpty() && originalData != null && !originalData.isEmpty()) {
133       for(InstanceIdentifier<?> instanceId : removedPaths) {
134         if(Link.class.isAssignableFrom(instanceId.getTargetType())) {
135           Link link = (Link) originalData.get(instanceId);
136           if(!(link.getLinkId().getValue().contains("host"))) {
137             isGraphUpdated = true;
138             _logger.debug("Graph is updated! Removed Link {}", link.getLinkId().getValue());
139             break;
140           }
141         }
142       }
143     }
144
145     if(!isGraphUpdated) {
146       return;
147     }
148     if(!networkGraphRefreshScheduled) {
149       synchronized(this) {
150         if(!networkGraphRefreshScheduled) {
151           topologyDataChangeEventProcessor.schedule(new TopologyDataChangeEventProcessor(), graphRefreshDelay, TimeUnit.MILLISECONDS);
152           networkGraphRefreshScheduled = true;
153           _logger.debug("Scheduled Graph for refresh.");
154         }
155       }
156     } else {
157       _logger.debug("Already scheduled for network graph refresh.");
158       threadReschedule = true;
159     }
160   }
161
162
163   /**
164    *
165    */
166   private class TopologyDataChangeEventProcessor implements Runnable {
167
168     @Override
169     public void run() {
170       if (threadReschedule) {
171         topologyDataChangeEventProcessor.schedule(this, graphRefreshDelay, TimeUnit.MILLISECONDS);
172         threadReschedule = false;
173         return;
174       }
175       _logger.debug("In network graph refresh thread.");
176       networkGraphRefreshScheduled = false;
177       networkGraphService.clear();
178       List<Link> links = getLinksFromTopology();
179       if(links == null || links.isEmpty()) {
180         return;
181       }
182       networkGraphService.addLinks(links);
183       final ReadWriteTransaction readWriteTransaction = dataBroker.newReadWriteTransaction();
184       updateNodeConnectorStatus(readWriteTransaction);
185       final CheckedFuture writeTxResultFuture = readWriteTransaction.submit();
186       Futures.addCallback(writeTxResultFuture, new FutureCallback() {
187         @Override
188         public void onSuccess(Object o) {
189           _logger.debug("TopologyLinkDataChangeHandler write successful for tx :{}", readWriteTransaction.getIdentifier());
190         }
191
192         @Override
193         public void onFailure(Throwable throwable) {
194           _logger.error("TopologyLinkDataChangeHandler write transaction {} failed", readWriteTransaction.getIdentifier(), throwable.getCause());
195         }
196       });
197       _logger.debug("Done with network graph refresh thread.");
198     }
199
200     private List<Link> getLinksFromTopology() {
201       InstanceIdentifier<Topology> topologyInstanceIdentifier = InstanceIdentifierUtils.generateTopologyInstanceIdentifier(topologyId);
202       Topology topology = null;
203       ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
204       try {
205         Optional<Topology> topologyOptional = readOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, topologyInstanceIdentifier).get();
206         if(topologyOptional.isPresent()) {
207           topology = topologyOptional.get();
208         }
209       } catch(Exception e) {
210         _logger.error("Error reading topology {}", topologyInstanceIdentifier);
211         readOnlyTransaction.close();
212         throw new RuntimeException("Error reading from operational store, topology : " + topologyInstanceIdentifier, e);
213       }
214       readOnlyTransaction.close();
215       if(topology == null) {
216         return null;
217       }
218       List<Link> links = topology.getLink();
219       if(links == null || links.isEmpty()) {
220         return null;
221       }
222       List<Link> internalLinks = new ArrayList<>();
223       for(Link link : links) {
224         if(!(link.getLinkId().getValue().contains("host"))) {
225           internalLinks.add(link);
226         }
227       }
228       return internalLinks;
229     }
230
231     /**
232      * @param readWriteTransaction
233      */
234     private void updateNodeConnectorStatus(ReadWriteTransaction readWriteTransaction) {
235       List<Link> allLinks = networkGraphService.getAllLinks();
236       if(allLinks == null || allLinks.isEmpty()) {
237         return;
238       }
239
240       List<Link> mstLinks = networkGraphService.getLinksInMst();
241       for(Link link : allLinks) {
242         if(mstLinks != null && !mstLinks.isEmpty() && mstLinks.contains(link)) {
243           updateNodeConnector(readWriteTransaction, getSourceNodeConnectorRef(link), StpStatus.Forwarding);
244           updateNodeConnector(readWriteTransaction, getDestNodeConnectorRef(link), StpStatus.Forwarding);
245         } else {
246           updateNodeConnector(readWriteTransaction, getSourceNodeConnectorRef(link), StpStatus.Discarding);
247           updateNodeConnector(readWriteTransaction, getDestNodeConnectorRef(link), StpStatus.Discarding);
248         }
249       }
250     }
251
252     /**
253      * @param link
254      * @return
255      */
256     private NodeConnectorRef getSourceNodeConnectorRef(Link link) {
257       InstanceIdentifier<NodeConnector> nodeConnectorInstanceIdentifier
258           = InstanceIdentifierUtils.createNodeConnectorIdentifier(
259           link.getSource().getSourceNode().getValue(),
260           link.getSource().getSourceTp().getValue());
261       return new NodeConnectorRef(nodeConnectorInstanceIdentifier);
262     }
263
264     /**
265      * @param link
266      * @return
267      */
268     private NodeConnectorRef getDestNodeConnectorRef(Link link) {
269       InstanceIdentifier<NodeConnector> nodeConnectorInstanceIdentifier
270           = InstanceIdentifierUtils.createNodeConnectorIdentifier(
271           link.getDestination().getDestNode().getValue(),
272           link.getDestination().getDestTp().getValue());
273
274       return new NodeConnectorRef(nodeConnectorInstanceIdentifier);
275     }
276
277     /**
278      * @param readWriteTransaction
279      * @param nodeConnectorRef
280      * @param stpStatus
281      */
282     private void updateNodeConnector(ReadWriteTransaction readWriteTransaction, NodeConnectorRef nodeConnectorRef, StpStatus stpStatus) {
283       StpStatusAwareNodeConnectorBuilder stpStatusAwareNodeConnectorBuilder = new StpStatusAwareNodeConnectorBuilder()
284           .setStatus(stpStatus);
285       checkIfExistAndUpdateNodeConnector(readWriteTransaction, nodeConnectorRef, stpStatusAwareNodeConnectorBuilder.build());
286     }
287
288     /**
289      * @param readWriteTransaction
290      * @param nodeConnectorRef
291      * @param stpStatusAwareNodeConnector
292      */
293     private void checkIfExistAndUpdateNodeConnector(ReadWriteTransaction readWriteTransaction, NodeConnectorRef nodeConnectorRef, StpStatusAwareNodeConnector stpStatusAwareNodeConnector) {
294       NodeConnector nc = null;
295       try {
296         Optional<NodeConnector> dataObjectOptional = readWriteTransaction.read(LogicalDatastoreType.OPERATIONAL, (InstanceIdentifier<NodeConnector>) nodeConnectorRef.getValue()).get();
297         if(dataObjectOptional.isPresent())
298           nc = (NodeConnector) dataObjectOptional.get();
299       } catch(Exception e) {
300         _logger.error("Error reading node connector {}", nodeConnectorRef.getValue());
301         readWriteTransaction.submit();
302         throw new RuntimeException("Error reading from operational store, node connector : " + nodeConnectorRef, e);
303       }
304       NodeConnectorBuilder nodeConnectorBuilder;
305       if(nc != null) {
306         if(sameStatusPresent(nc.getAugmentation(StpStatusAwareNodeConnector.class), stpStatusAwareNodeConnector.getStatus())) {
307           return;
308         }
309
310         //build instance id for StpStatusAwareNodeConnector
311         InstanceIdentifier<StpStatusAwareNodeConnector> stpStatusAwareNcInstanceId =
312             ((InstanceIdentifier<NodeConnector>) nodeConnectorRef.getValue())
313                 .augmentation(StpStatusAwareNodeConnector.class);
314         //update StpStatusAwareNodeConnector in operational store
315         readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, stpStatusAwareNcInstanceId, stpStatusAwareNodeConnector);
316         _logger.debug("Merged Stp Status aware node connector in operational {} with status {}", stpStatusAwareNcInstanceId, stpStatusAwareNodeConnector);
317       } else {
318         _logger.error("Unable to update Stp Status node connector {} note present in  operational store", nodeConnectorRef.getValue());
319       }
320     }
321
322     /**
323      * @param stpStatusAwareNodeConnector
324      * @return
325      */
326     private boolean sameStatusPresent(StpStatusAwareNodeConnector stpStatusAwareNodeConnector, StpStatus stpStatus) {
327
328       if(stpStatusAwareNodeConnector == null)
329         return false;
330
331       if(stpStatusAwareNodeConnector.getStatus() == null)
332         return false;
333
334       if(stpStatus.getIntValue() != stpStatusAwareNodeConnector.getStatus().getIntValue())
335         return false;
336
337       return true;
338     }
339   }
340 }