Update to work on Sodium SR1
[l2switch.git] / loopremover / implementation / src / main / java / org / opendaylight / l2switch / loopremover / topology / TopologyLinkDataChangeHandler.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.l2switch.loopremover.topology;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
25 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
26 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.l2switch.loopremover.util.InstanceIdentifierUtils;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatus;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnector;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnectorBuilder;
36 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Listens to data change events on topology links {@link
48  * org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
49  * and maintains a topology graph using provided NetworkGraphService
50  * {@link org.opendaylight.l2switch.loopremover.topology.NetworkGraphService}.
51  * It refreshes the graph after a delay(default 10 sec) to accommodate burst of
52  * change events if they come in bulk. This is to avoid continuous refresh of
53  * graph on a series of change events in short time.
54  */
55 public class TopologyLinkDataChangeHandler implements DataTreeChangeListener<Link> {
56     private static final Logger LOG = LoggerFactory.getLogger(TopologyLinkDataChangeHandler.class);
57     private static final String DEFAULT_TOPOLOGY_ID = "flow:1";
58     private static final long DEFAULT_GRAPH_REFRESH_DELAY = 1000;
59
60     private final ScheduledExecutorService topologyDataChangeEventProcessor = Executors.newScheduledThreadPool(1);
61
62     private final NetworkGraphService networkGraphService;
63     private volatile boolean networkGraphRefreshScheduled = false;
64     private volatile boolean threadReschedule = false;
65     private long graphRefreshDelay;
66     private String topologyId;
67
68     private final DataBroker dataBroker;
69
70     public TopologyLinkDataChangeHandler(DataBroker dataBroker, NetworkGraphService networkGraphService) {
71         Preconditions.checkNotNull(dataBroker, "dataBroker should not be null.");
72         Preconditions.checkNotNull(networkGraphService, "networkGraphService should not be null.");
73         this.dataBroker = dataBroker;
74         this.networkGraphService = networkGraphService;
75     }
76
77     public void setGraphRefreshDelay(long graphRefreshDelay) {
78         if (graphRefreshDelay < 0) {
79             this.graphRefreshDelay = DEFAULT_GRAPH_REFRESH_DELAY;
80         } else {
81             this.graphRefreshDelay = graphRefreshDelay;
82         }
83     }
84
85     public void setTopologyId(String topologyId) {
86         if (topologyId == null || topologyId.isEmpty()) {
87             this.topologyId = DEFAULT_TOPOLOGY_ID;
88         } else {
89             this.topologyId = topologyId;
90         }
91     }
92
93     /**
94      * Registers as a data listener to receive changes done to {@link org.opendaylight.yang.gen.v1.urn.tbd.params
95      * .xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
96      * under
97      * {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology}
98      * operation data root.
99      */
100     public ListenerRegistration<TopologyLinkDataChangeHandler> registerAsDataChangeListener() {
101         InstanceIdentifier<Link> linkInstance = InstanceIdentifier.builder(NetworkTopology.class)
102                 .child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Link.class).build();
103         return dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(
104                 LogicalDatastoreType.OPERATIONAL, linkInstance), this);
105     }
106
107     /**
108      * Handler for onDataChanged events and schedules the building of the
109      * network graph.
110      */
111     @Override
112     public void onDataTreeChanged(Collection<DataTreeModification<Link>> changes) {
113         boolean isGraphUpdated = false;
114
115         for (DataTreeModification<Link> change: changes) {
116             DataObjectModification<Link> rootNode = change.getRootNode();
117             switch (rootNode.getModificationType()) {
118                 case WRITE:
119                     Link createdLink = rootNode.getDataAfter();
120                     if (rootNode.getDataBefore() == null && !createdLink.getLinkId().getValue().contains("host")) {
121                         isGraphUpdated = true;
122                         LOG.debug("Graph is updated! Added Link {}", createdLink.getLinkId().getValue());
123                     }
124                     break;
125                 case DELETE:
126                     Link deletedLink = rootNode.getDataBefore();
127                     if (!deletedLink.getLinkId().getValue().contains("host")) {
128                         isGraphUpdated = true;
129                         LOG.debug("Graph is updated! Removed Link {}", deletedLink.getLinkId().getValue());
130                         break;
131                     }
132                     break;
133                 default:
134                     break;
135             }
136         }
137
138         if (!isGraphUpdated) {
139             return;
140         }
141         if (!networkGraphRefreshScheduled) {
142             synchronized (this) {
143                 if (!networkGraphRefreshScheduled) {
144                     topologyDataChangeEventProcessor.schedule(new TopologyDataChangeEventProcessor(), graphRefreshDelay,
145                             TimeUnit.MILLISECONDS);
146                     networkGraphRefreshScheduled = true;
147                     LOG.debug("Scheduled Graph for refresh.");
148                 }
149             }
150         } else {
151             LOG.debug("Already scheduled for network graph refresh.");
152             threadReschedule = true;
153         }
154     }
155
156     private class TopologyDataChangeEventProcessor implements Runnable {
157
158         @Override
159         public void run() {
160             if (threadReschedule) {
161                 topologyDataChangeEventProcessor.schedule(this, graphRefreshDelay, TimeUnit.MILLISECONDS);
162                 threadReschedule = false;
163                 return;
164             }
165             LOG.debug("In network graph refresh thread.");
166             networkGraphRefreshScheduled = false;
167             networkGraphService.clear();
168             List<Link> links = getLinksFromTopology();
169             if (links == null || links.isEmpty()) {
170                 return;
171             }
172             networkGraphService.addLinks(links);
173             final ReadWriteTransaction readWriteTransaction = dataBroker.newReadWriteTransaction();
174             updateNodeConnectorStatus(readWriteTransaction);
175             Futures.addCallback(readWriteTransaction.submit(), new FutureCallback<Void>() {
176                 @Override
177                 public void onSuccess(Void notUsed) {
178                     LOG.debug("TopologyLinkDataChangeHandler write successful for tx :{}",
179                             readWriteTransaction.getIdentifier());
180                 }
181
182                 @Override
183                 public void onFailure(Throwable throwable) {
184                     LOG.error("TopologyLinkDataChangeHandler write transaction {} failed",
185                             readWriteTransaction.getIdentifier(), throwable.getCause());
186                 }
187             }, MoreExecutors.directExecutor());
188             LOG.debug("Done with network graph refresh thread.");
189         }
190
191         private List<Link> getLinksFromTopology() {
192             InstanceIdentifier<Topology> topologyInstanceIdentifier = InstanceIdentifierUtils
193                     .generateTopologyInstanceIdentifier(topologyId);
194             Topology topology = null;
195             ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
196             try {
197                 Optional<Topology> topologyOptional = readOnlyTransaction
198                         .read(LogicalDatastoreType.OPERATIONAL, topologyInstanceIdentifier).get();
199                 if (topologyOptional.isPresent()) {
200                     topology = topologyOptional.get();
201                 }
202             } catch (InterruptedException | ExecutionException e) {
203                 LOG.error("Error reading topology {}", topologyInstanceIdentifier);
204                 readOnlyTransaction.close();
205                 throw new RuntimeException(
206                         "Error reading from operational store, topology : " + topologyInstanceIdentifier, e);
207             }
208             readOnlyTransaction.close();
209             if (topology == null) {
210                 return null;
211             }
212             List<Link> links = topology.getLink();
213             if (links == null || links.isEmpty()) {
214                 return null;
215             }
216             List<Link> internalLinks = new ArrayList<>();
217             for (Link link : links) {
218                 if (!link.getLinkId().getValue().contains("host")) {
219                     internalLinks.add(link);
220                 }
221             }
222             return internalLinks;
223         }
224
225         private void updateNodeConnectorStatus(ReadWriteTransaction readWriteTransaction) {
226             List<Link> allLinks = networkGraphService.getAllLinks();
227             if (allLinks == null || allLinks.isEmpty()) {
228                 return;
229             }
230
231             List<Link> mstLinks = networkGraphService.getLinksInMst();
232             for (Link link : allLinks) {
233                 if (mstLinks != null && !mstLinks.isEmpty() && mstLinks.contains(link)) {
234                     updateNodeConnector(readWriteTransaction, getSourceNodeConnectorRef(link), StpStatus.Forwarding);
235                     updateNodeConnector(readWriteTransaction, getDestNodeConnectorRef(link), StpStatus.Forwarding);
236                 } else {
237                     updateNodeConnector(readWriteTransaction, getSourceNodeConnectorRef(link), StpStatus.Discarding);
238                     updateNodeConnector(readWriteTransaction, getDestNodeConnectorRef(link), StpStatus.Discarding);
239                 }
240             }
241         }
242
243         private NodeConnectorRef getSourceNodeConnectorRef(Link link) {
244             InstanceIdentifier<NodeConnector> nodeConnectorInstanceIdentifier = InstanceIdentifierUtils
245                     .createNodeConnectorIdentifier(link.getSource().getSourceNode().getValue(),
246                             link.getSource().getSourceTp().getValue());
247             return new NodeConnectorRef(nodeConnectorInstanceIdentifier);
248         }
249
250         private NodeConnectorRef getDestNodeConnectorRef(Link link) {
251             InstanceIdentifier<NodeConnector> nodeConnectorInstanceIdentifier = InstanceIdentifierUtils
252                     .createNodeConnectorIdentifier(link.getDestination().getDestNode().getValue(),
253                             link.getDestination().getDestTp().getValue());
254
255             return new NodeConnectorRef(nodeConnectorInstanceIdentifier);
256         }
257
258         private void updateNodeConnector(ReadWriteTransaction readWriteTransaction, NodeConnectorRef nodeConnectorRef,
259                 StpStatus stpStatus) {
260             StpStatusAwareNodeConnectorBuilder stpStatusAwareNodeConnectorBuilder =
261                     new StpStatusAwareNodeConnectorBuilder().setStatus(stpStatus);
262             checkIfExistAndUpdateNodeConnector(readWriteTransaction, nodeConnectorRef,
263                     stpStatusAwareNodeConnectorBuilder.build());
264         }
265
266         private void checkIfExistAndUpdateNodeConnector(ReadWriteTransaction readWriteTransaction,
267                 NodeConnectorRef nodeConnectorRef, StpStatusAwareNodeConnector stpStatusAwareNodeConnector) {
268             NodeConnector nc = null;
269             try {
270                 Optional<NodeConnector> dataObjectOptional = readWriteTransaction.read(LogicalDatastoreType.OPERATIONAL,
271                         (InstanceIdentifier<NodeConnector>) nodeConnectorRef.getValue()).get();
272                 if (dataObjectOptional.isPresent()) {
273                     nc = dataObjectOptional.get();
274                 }
275             } catch (InterruptedException | ExecutionException e) {
276                 LOG.error("Error reading node connector {}", nodeConnectorRef.getValue());
277                 readWriteTransaction.submit();
278                 throw new RuntimeException("Error reading from operational store, node connector : " + nodeConnectorRef,
279                         e);
280             }
281
282             if (nc != null) {
283                 if (sameStatusPresent(nc.augmentation(StpStatusAwareNodeConnector.class),
284                         stpStatusAwareNodeConnector.getStatus())) {
285                     return;
286                 }
287
288                 // build instance id for StpStatusAwareNodeConnector
289                 InstanceIdentifier<StpStatusAwareNodeConnector> stpStatusAwareNcInstanceId =
290                         ((InstanceIdentifier<NodeConnector>) nodeConnectorRef
291                                 .getValue()).augmentation(StpStatusAwareNodeConnector.class);
292                 // update StpStatusAwareNodeConnector in operational store
293                 readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, stpStatusAwareNcInstanceId,
294                         stpStatusAwareNodeConnector);
295                 LOG.debug("Merged Stp Status aware node connector in operational {} with status {}",
296                         stpStatusAwareNcInstanceId, stpStatusAwareNodeConnector);
297             } else {
298                 LOG.error("Unable to update Stp Status node connector {} note present in  operational store",
299                         nodeConnectorRef.getValue());
300             }
301         }
302
303         private boolean sameStatusPresent(StpStatusAwareNodeConnector stpStatusAwareNodeConnector,
304                 StpStatus stpStatus) {
305
306             if (stpStatusAwareNodeConnector == null) {
307                 return false;
308             }
309
310             if (stpStatusAwareNodeConnector.getStatus() == null) {
311                 return false;
312             }
313
314             if (stpStatus.getIntValue() != stpStatusAwareNodeConnector.getStatus().getIntValue()) {
315                 return false;
316             }
317
318             return true;
319         }
320     }
321 }