- Add flow and port stats polling intervals in config.ini
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.Set;
15 import java.util.Timer;
16 import java.util.TimerTask;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.CopyOnWriteArrayList;
21 import java.util.concurrent.LinkedBlockingQueue;
22
23 import org.apache.commons.lang3.tuple.ImmutablePair;
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.eclipse.osgi.framework.console.CommandInterpreter;
26 import org.eclipse.osgi.framework.console.CommandProvider;
27 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
28 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
29 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
30 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
31 import org.osgi.framework.BundleContext;
32 import org.osgi.framework.FrameworkUtil;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import org.opendaylight.controller.sal.core.Bandwidth;
37 import org.opendaylight.controller.sal.core.Config;
38 import org.opendaylight.controller.sal.core.ContainerFlow;
39 import org.opendaylight.controller.sal.core.Edge;
40 import org.opendaylight.controller.sal.core.IContainerListener;
41 import org.opendaylight.controller.sal.core.Node;
42 import org.opendaylight.controller.sal.core.NodeConnector;
43 import org.opendaylight.controller.sal.core.Property;
44 import org.opendaylight.controller.sal.core.State;
45 import org.opendaylight.controller.sal.core.UpdateType;
46 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
47 import org.opendaylight.controller.sal.utils.GlobalConstants;
48
49 /**
50  * The class describes a shim layer that relays the topology events from
51  * OpenFlow core to various listeners. The notifications are filtered based on
52  * container configurations.
53  */
54 public class TopologyServiceShim implements IDiscoveryService,
55                 IContainerListener, CommandProvider, IRefreshInternalProvider,
56                 IInventoryShimExternalListener {
57     protected static final Logger logger = LoggerFactory
58             .getLogger(TopologyServiceShim.class);
59     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
60     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
61     private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
62
63     private BlockingQueue<NotifyEntry> notifyQ;
64     private Thread notifyThread;
65     private BlockingQueue<String> bulkNotifyQ;
66     private Thread ofPluginTopoBulkUpdate;
67     private volatile Boolean shuttingDown = false;
68     private IOFStatisticsManager statsMgr;
69     private Timer pollTimer;
70     private TimerTask txRatePoller;
71     private Thread bwUtilNotifyThread;
72     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
73     private List<NodeConnector> connectorsOverUtilized;
74     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
75                                                    // bandwidth
76
77     class NotifyEntry {
78         String container;
79         Pair<Edge, Set<Property>> edgeProps;
80         UpdateType type;
81
82         NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
83                 UpdateType type) {
84             this.container = container;
85             this.edgeProps = edgeProps;
86             this.type = type;
87         }
88     }
89
90     class TopologyNotify implements Runnable {
91         private final BlockingQueue<NotifyEntry> notifyQ;
92
93         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
94             this.notifyQ = notifyQ;
95         }
96
97         public void run() {
98             while (true) {
99                 try {
100                     NotifyEntry entry = notifyQ.take();
101
102                     ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
103                             .get(entry.container);
104                     topologServiceShimListener.edgeUpdate(
105                             entry.edgeProps.getLeft(), entry.type,
106                             entry.edgeProps.getRight());
107
108                     entry = null;
109                 } catch (InterruptedException e1) {
110                     logger.warn("TopologyNotify interrupted {}", e1.getMessage());
111                     if (shuttingDown) {
112                         return;
113                     }
114                 } catch (Exception e2) {
115                     logger.error("",e2);
116                 }
117             }
118         }
119     }
120
121     class UtilizationUpdate {
122         NodeConnector connector;
123         UpdateType type;
124
125         UtilizationUpdate(NodeConnector connector, UpdateType type) {
126             this.connector = connector;
127             this.type = type;
128         }
129     }
130
131     class BwUtilizationNotify implements Runnable {
132         private final BlockingQueue<UtilizationUpdate> notifyQ;
133
134         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
135             this.notifyQ = notifyQ;
136         }
137
138         public void run() {
139             while (true) {
140                 try {
141                     UtilizationUpdate update = notifyQ.take();
142                     NodeConnector connector = update.connector;
143                     Set<String> containerList = edgeMap.keySet();
144                     for (String container : containerList) {
145                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
146                                 .get(container);
147                         Edge edge = edgePropsMap.get(connector).getLeft();
148                         if (edge.getTailNodeConnector().equals(connector)) {
149                             ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
150                                     .get(container);
151                             if (update.type == UpdateType.ADDED) {
152                                 topologServiceShimListener
153                                         .edgeOverUtilized(edge);
154                             } else {
155                                 topologServiceShimListener
156                                         .edgeUtilBackToNormal(edge);
157                             }
158                         }
159                     }
160                 } catch (InterruptedException e1) {
161                     logger.warn(
162                             "Edge Bandwidth Utilization Notify Thread interrupted {}",
163                             e1.getMessage());
164                     if (shuttingDown) {
165                         return;
166                     }
167                 } catch (Exception e2) {
168                     logger.error("",e2);
169                 }
170             }
171         }
172     }
173
174     /**
175      * Function called by the dependency manager when all the required
176      * dependencies are satisfied
177      * 
178      */
179     void init() {
180         logger.trace("Init called");
181         connectorsOverUtilized = new ArrayList<NodeConnector>();
182         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
183         notifyThread = new Thread(new TopologyNotify(notifyQ));
184         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
185         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
186         bulkNotifyQ = new LinkedBlockingQueue<String>();
187         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
188             @Override
189             public void run() {
190                 while (true) {
191                     try {
192                         String containerName = bulkNotifyQ.take();
193                         logger.debug("Bulk Notify container:{}", containerName);
194                         TopologyBulkUpdate(containerName);
195                     } catch (InterruptedException e) {
196                         logger.warn("Topology Bulk update thread interrupted");
197                         if (shuttingDown) {
198                             return;
199                         }
200                     }
201                 }
202             }
203         }, "Topology Bulk Update");
204
205         // Initialize node connector tx bit rate poller timer
206         pollTimer = new Timer();
207         txRatePoller = new TimerTask() {
208             @Override
209             public void run() {
210                 pollTxBitRates();
211             }
212         };
213
214         registerWithOSGIConsole();
215     }
216
217     /**
218      * Continuously polls the transmit bit rate for all the node connectors from
219      * statistics manager and trigger the warning notification upward when the
220      * transmit rate is above a threshold which is a percentage of the edge
221      * bandwidth
222      */
223     protected void pollTxBitRates() {
224         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
225                 .get(GlobalConstants.DEFAULT.toString());
226         if (globalContainerEdges == null) {
227             return;
228         }
229
230         for (NodeConnector connector : globalContainerEdges.keySet()) {
231             // Skip if node connector belongs to production switch
232             if (connector.getType().equals(
233                     NodeConnector.NodeConnectorIDType.PRODUCTION)) {
234                 continue;
235             }
236
237             // Get edge for which this node connector is head
238             Pair<Edge, Set<Property>> props = this.edgeMap.get(
239                     GlobalConstants.DEFAULT.toString()).get(connector);
240             // On switch mgr restart the props get reset
241             if (props == null) {
242                 continue;
243             }
244             Set<Property> propSet = props.getRight();
245             if (propSet == null) {
246                 continue;
247             }
248
249             float bw = 0;
250             for (Property prop : propSet) {
251                 if (prop instanceof Bandwidth) {
252                     bw = ((Bandwidth) prop).getValue();
253                     break;
254                 }
255             }
256
257             // Skip if agent did not provide a bandwidth info for the edge
258             if (bw == 0) {
259                 continue;
260             }
261
262             // Compare bandwidth usage
263             Long switchId = (Long) connector.getNode().getID();
264             Short port = (Short) connector.getID();
265             float rate = statsMgr.getTransmitRate(switchId, port);
266             if (rate > bwThresholdFactor * bw) {
267                 if (!connectorsOverUtilized.contains(connector)) {
268                     connectorsOverUtilized.add(connector);
269                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
270                             UpdateType.ADDED));
271                 }
272             } else {
273                 if (connectorsOverUtilized.contains(connector)) {
274                     connectorsOverUtilized.remove(connector);
275                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
276                             UpdateType.REMOVED));
277                 }
278             }
279         }
280
281     }
282
283     /**
284      * Function called by the dependency manager when at least one dependency
285      * become unsatisfied or when the component is shutting down because for
286      * example bundle is being stopped.
287      * 
288      */
289     void destroy() {
290         logger.trace("DESTROY called!");
291         notifyQ = null;
292         notifyThread = null;
293     }
294
295     /**
296      * Function called by dependency manager after "init ()" is called and after
297      * the services provided by the class are registered in the service registry
298      * 
299      */
300     void start() {
301         logger.trace("START called!");
302         notifyThread.start();
303         bwUtilNotifyThread.start();
304         ofPluginTopoBulkUpdate.start();
305         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
306     }
307
308     /**
309      * Function called by the dependency manager before the services exported by
310      * the component are unregistered, this will be followed by a "destroy ()"
311      * calls
312      * 
313      */
314     void stop() {
315         logger.trace("STOP called!");
316         shuttingDown = true;
317         notifyThread.interrupt();
318     }
319
320     void setTopologyServiceShimListener(Map<?, ?> props,
321             ITopologyServiceShimListener s) {
322         if (props == null) {
323             logger.error("Didn't receive the service properties");
324             return;
325         }
326         String containerName = (String) props.get("containerName");
327         if (containerName == null) {
328             logger.error("containerName not supplied");
329             return;
330         }
331         if ((this.topologyServiceShimListeners != null)
332                 && !this.topologyServiceShimListeners
333                         .containsKey(containerName)) {
334             this.topologyServiceShimListeners.put(containerName, s);
335             logger.trace("Added topologyServiceShimListener for container: {}",
336                     containerName);
337         }
338     }
339
340     void unsetTopologyServiceShimListener(Map<?, ?> props,
341             ITopologyServiceShimListener s) {
342         if (props == null) {
343             logger.error("Didn't receive the service properties");
344             return;
345         }
346         String containerName = (String) props.get("containerName");
347         if (containerName == null) {
348             logger.error("containerName not supplied");
349             return;
350         }
351         if ((this.topologyServiceShimListeners != null)
352                 && this.topologyServiceShimListeners.containsKey(containerName)
353                 && this.topologyServiceShimListeners.get(containerName).equals(
354                         s)) {
355             this.topologyServiceShimListeners.remove(containerName);
356             logger.trace("Removed topologyServiceShimListener for container: {}",
357                     containerName);
358         }
359     }
360
361     void setStatisticsManager(IOFStatisticsManager s) {
362         this.statsMgr = s;
363     }
364
365     void unsetStatisticsManager(IOFStatisticsManager s) {
366         if (this.statsMgr == s) {
367             this.statsMgr = null;
368         }
369     }
370
371     private void removeNodeConnector(String container,
372             NodeConnector nodeConnector) {
373         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
374                 .get(container);
375         if (edgePropsMap == null) {
376             return;
377         }
378
379         // Remove edge in one direction
380         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
381         if (edgeProps == null) {
382             return;
383         }
384         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
385
386         // Remove edge in another direction
387         edgeProps = edgePropsMap
388                 .get(edgeProps.getLeft().getHeadNodeConnector());
389         if (edgeProps == null) {
390             return;
391         }
392         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
393     }
394
395     private void notifyEdge(String container, Edge edge, UpdateType type,
396             Set<Property> props) {
397         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
398                 .get(container);
399         NodeConnector src = edge.getTailNodeConnector();
400         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
401                 edge, props);
402
403         switch (type) {
404         case ADDED:
405         case CHANGED:
406             if (edgePropsMap == null) {
407                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
408             } else {
409                 if (edgePropsMap.containsKey(src)
410                         && edgePropsMap.get(src).equals(edgeProps)) {
411                     // Entry already exists. Return here.
412                     return;
413                 }
414             }
415             edgePropsMap.put(src, edgeProps);
416             edgeMap.put(container, edgePropsMap);
417             break;
418         case REMOVED:
419             if (edgePropsMap != null) {
420                 edgePropsMap.remove(src);
421                 if (edgePropsMap.isEmpty()) {
422                     edgeMap.remove(container);
423                 } else {
424                     edgeMap.put(container, edgePropsMap);
425                 }
426             }
427             break;
428         default:
429             logger.debug("notifyEdge: invalid {} for Edge {} in container {}",
430                     type, edge, container);
431             return;
432         }
433
434         notifyQ.add(new NotifyEntry(container, edgeProps, type));
435
436         logger.debug("notifyEdge: {} Edge {} in container {}",
437                 type, edge, container);
438     }
439
440     @Override
441     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
442         if ((edge == null) || (type == null)) {
443             return;
444         }
445
446         // Notify default container
447         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
448
449         // Notify the corresponding containers
450         List<String> containers = getEdgeContainers(edge);
451         if (containers != null) {
452             for (String container : containers) {
453                 notifyEdge(container, edge, type, props);
454             }
455         }
456     }
457
458     /*
459      * Return a list of containers the edge associated with
460      */
461     private List<String> getEdgeContainers(Edge edge) {
462         NodeConnector src = edge.getTailNodeConnector(), dst = edge
463                 .getHeadNodeConnector();
464
465         if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
466             /* Find the common containers for both ends */
467             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
468                     .get(dst), cmnContainers = null;
469             if ((srcContainers != null) && (dstContainers != null)) {
470                 cmnContainers = new ArrayList<String>(srcContainers);
471                 cmnContainers.retainAll(dstContainers);
472             }
473             return cmnContainers;
474         } else {
475             /*
476              * If the neighbor is part of a monitored production network, get
477              * the containers that the edge port belongs to
478              */
479             return this.containerMap.get(dst);
480         }
481     }
482
483     @Override
484     public void tagUpdated(String containerName, Node n, short oldTag,
485             short newTag, UpdateType t) {
486     }
487
488     @Override
489     public void containerFlowUpdated(String containerName,
490             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
491     }
492
493     @Override
494     public void nodeConnectorUpdated(String containerName, NodeConnector p,
495             UpdateType t) {
496         if (this.containerMap == null) {
497             logger.error("containerMap is NULL");
498             return;
499         }
500         List<String> containers = this.containerMap.get(p);
501         if (containers == null) {
502             containers = new CopyOnWriteArrayList<String>();
503         }
504         boolean updateMap = false;
505         switch (t) {
506         case ADDED:
507             if (!containers.contains(containerName)) {
508                 containers.add(containerName);
509                 updateMap = true;
510             }
511             break;
512         case REMOVED:
513             if (containers.contains(containerName)) {
514                 containers.remove(containerName);
515                 updateMap = true;
516                 removeNodeConnector(containerName, p);
517             }
518             break;
519         case CHANGED:
520             break;
521         }
522         if (updateMap) {
523             if (containers.isEmpty()) {
524                 // Do cleanup to reduce memory footprint if no
525                 // elements to be tracked
526                 this.containerMap.remove(p);
527             } else {
528                 this.containerMap.put(p, containers);
529             }
530         }
531     }
532
533     @Override
534     public void containerModeUpdated(UpdateType t) {
535         // do nothing
536     }
537
538     private void registerWithOSGIConsole() {
539         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
540                 .getBundleContext();
541         bundleContext.registerService(CommandProvider.class.getName(), this,
542                 null);
543     }
544
545     @Override
546     public String getHelp() {
547         StringBuffer help = new StringBuffer();
548         help.append("---Topology Service Shim---\n");
549         help.append("\t pem [container]               - Print edgeMap entries");
550         help.append(" for a given container\n");
551         return help.toString();
552     }
553
554     public void _pem(CommandInterpreter ci) {
555         String container = ci.nextArgument();
556         if (container == null) {
557             container = GlobalConstants.DEFAULT.toString();
558         }
559
560         ci.println("Container: " + container);
561         ci.println("                             Edge                                          Bandwidth");
562
563         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
564                 .get(container);
565         if (edgePropsMap == null) {
566             return;
567         }
568         int count = 0;
569         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
570             if (edgeProps == null) {
571                 continue;
572             }
573
574             long bw = 0;
575             for (Property prop : edgeProps.getRight()) {
576                 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
577                     bw = ((Bandwidth) prop).getValue();
578                 }
579             }
580             count++;
581             ci.println(edgeProps.getLeft() + "          " + bw);
582         }
583         ci.println("Total number of Edges: " + count);
584     }
585
586     public void _bwfactor(CommandInterpreter ci) {
587         String factorString = ci.nextArgument();
588         if (factorString == null) {
589             ci.println("Bw threshold: " + this.bwThresholdFactor);
590             ci.println("Insert a non null bw threshold");
591             return;
592         }
593         bwThresholdFactor = Float.parseFloat(factorString);
594         ci.println("New Bw threshold: " + this.bwThresholdFactor);
595     }
596
597     /**
598      * This method will trigger topology updates to be sent toward SAL. SAL then
599      * pushes the updates to ALL the applications that have registered as
600      * listeners for this service. SAL has no way of knowing which application
601      * requested for the refresh.
602      * 
603      * As an example of this case, is stopping and starting the Topology
604      * Manager. When the topology Manager is stopped, and restarted, it will no
605      * longer have the latest topology. Hence, a request is sent here.
606      * 
607      * @param containerName
608      * @return void
609      */
610     @Override
611     public void requestRefresh(String containerName) {
612         // wake up a bulk update thread and exit
613         // the thread will execute the bulkUpdate()
614         bulkNotifyQ.add(containerName);
615     }
616
617     /**
618      * Reading the current topology database, the method will replay all the
619      * edge updates for the ITopologyServiceShimListener instance in the given
620      * container, which will in turn publish them toward SAL.
621      * 
622      * @param containerName
623      */
624     private void TopologyBulkUpdate(String containerName) {
625         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
626
627         logger.debug("Try bulk update for container:{}", containerName);
628         edgePropMap = edgeMap.get(containerName);
629         if (edgePropMap == null) {
630             logger.debug("No edges known for container:{}", containerName);
631             return;
632         }
633         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
634                 .get(containerName);
635         if (topologServiceShimListener == null) {
636             logger.debug("No topology service shim listener for container:{}",
637                     containerName);
638             return;
639         }
640         int i = 0;
641         for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
642             if (edgeProps != null) {
643                 i++;
644                 logger.trace("Add edge {}", edgeProps.getLeft());
645                 topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
646                         UpdateType.ADDED, edgeProps.getRight());
647             }
648         }
649         logger.debug("Sent {} updates", i);
650     }
651
652     @Override
653     public void updateNode(Node node, UpdateType type, Set<Property> props) {
654     }
655
656     @Override
657     public void updateNodeConnector(NodeConnector nodeConnector,
658             UpdateType type, Set<Property> props) {
659         List<String> containers = new ArrayList<String>();
660         List<String> conList = this.containerMap.get(nodeConnector);
661
662         containers.add(GlobalConstants.DEFAULT.toString());
663         if (conList != null) {
664             containers.addAll(conList);
665         }
666         
667         switch (type) {
668         case ADDED:
669             break;
670         case CHANGED:
671             if (props == null) {
672                 break;
673             }
674
675             boolean rmEdge = false;
676             for (Property prop : props) {
677                 if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
678                         || ((prop instanceof State) && (((State) prop)
679                                 .getValue() != State.EDGE_UP))) {
680                     /*
681                      * If port admin down or link down, remove the edges
682                      * associated with the port
683                      */
684                     rmEdge = true;
685                     break;
686                 }
687             }
688
689             if (rmEdge) {
690                 for (String cName : containers) {
691                     removeNodeConnector(cName, nodeConnector);
692                 }
693             }
694             break;
695         case REMOVED:
696             for (String cName : containers) {
697                 removeNodeConnector(cName, nodeConnector);
698             }
699             break;
700         default:
701             break;
702         }
703     }
704 }