- Plugin sends Barrier msg every 100 async msgs (configurable thru config.ini: of...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.Set;
15 import java.util.Timer;
16 import java.util.TimerTask;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.CopyOnWriteArrayList;
21 import java.util.concurrent.LinkedBlockingQueue;
22
23 import org.apache.commons.lang3.tuple.ImmutablePair;
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.eclipse.osgi.framework.console.CommandInterpreter;
26 import org.eclipse.osgi.framework.console.CommandProvider;
27 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
28 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
29 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
30 import org.osgi.framework.BundleContext;
31 import org.osgi.framework.FrameworkUtil;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import org.opendaylight.controller.sal.core.Bandwidth;
36 import org.opendaylight.controller.sal.core.ContainerFlow;
37 import org.opendaylight.controller.sal.core.Edge;
38 import org.opendaylight.controller.sal.core.IContainerListener;
39 import org.opendaylight.controller.sal.core.Node;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
44 import org.opendaylight.controller.sal.utils.GlobalConstants;
45
46 /**
47  * The class describes a shim layer that relays the topology events from
48  * OpenFlow core to various listeners. The notifications are filtered based on
49  * container configurations.
50  */
51 public class TopologyServiceShim implements IDiscoveryService,
52         IContainerListener, CommandProvider, IRefreshInternalProvider {
53     protected static final Logger logger = LoggerFactory
54             .getLogger(TopologyServiceShim.class);
55     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
56     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
57     private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
58
59     private BlockingQueue<NotifyEntry> notifyQ;
60     private Thread notifyThread;
61     private BlockingQueue<String> bulkNotifyQ;
62     private Thread ofPluginTopoBulkUpdate;
63     private volatile Boolean shuttingDown = false;
64     private IOFStatisticsManager statsMgr;
65     private Timer pollTimer;
66     private TimerTask txRatePoller;
67     private Thread bwUtilNotifyThread;
68     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
69     private List<NodeConnector> connectorsOverUtilized;
70     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
71                                                    // bandwidth
72
73     class NotifyEntry {
74         String container;
75         Pair<Edge, Set<Property>> edgeProps;
76         UpdateType type;
77
78         NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
79                 UpdateType type) {
80             this.container = container;
81             this.edgeProps = edgeProps;
82             this.type = type;
83         }
84     }
85
86     class TopologyNotify implements Runnable {
87         private final BlockingQueue<NotifyEntry> notifyQ;
88
89         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
90             this.notifyQ = notifyQ;
91         }
92
93         public void run() {
94             while (true) {
95                 try {
96                     NotifyEntry entry = notifyQ.take();
97
98                     ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
99                             .get(entry.container);
100                     topologServiceShimListener.edgeUpdate(
101                             entry.edgeProps.getLeft(), entry.type,
102                             entry.edgeProps.getRight());
103
104                     entry = null;
105                 } catch (InterruptedException e1) {
106                     logger.warn("TopologyNotify interrupted {}", e1.getMessage());
107                     if (shuttingDown) {
108                         return;
109                     }
110                 } catch (Exception e2) {
111                     logger.error("",e2);
112                 }
113             }
114         }
115     }
116
117     class UtilizationUpdate {
118         NodeConnector connector;
119         UpdateType type;
120
121         UtilizationUpdate(NodeConnector connector, UpdateType type) {
122             this.connector = connector;
123             this.type = type;
124         }
125     }
126
127     class BwUtilizationNotify implements Runnable {
128         private final BlockingQueue<UtilizationUpdate> notifyQ;
129
130         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
131             this.notifyQ = notifyQ;
132         }
133
134         public void run() {
135             while (true) {
136                 try {
137                     UtilizationUpdate update = notifyQ.take();
138                     NodeConnector connector = update.connector;
139                     Set<String> containerList = edgeMap.keySet();
140                     for (String container : containerList) {
141                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
142                                 .get(container);
143                         Edge edge = edgePropsMap.get(connector).getLeft();
144                         if (edge.getTailNodeConnector().equals(connector)) {
145                             ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
146                                     .get(container);
147                             if (update.type == UpdateType.ADDED) {
148                                 topologServiceShimListener
149                                         .edgeOverUtilized(edge);
150                             } else {
151                                 topologServiceShimListener
152                                         .edgeUtilBackToNormal(edge);
153                             }
154                         }
155                     }
156                 } catch (InterruptedException e1) {
157                     logger.warn(
158                             "Edge Bandwidth Utilization Notify Thread interrupted {}",
159                             e1.getMessage());
160                     if (shuttingDown) {
161                         return;
162                     }
163                 } catch (Exception e2) {
164                     logger.error("",e2);
165                 }
166             }
167         }
168     }
169
170     /**
171      * Function called by the dependency manager when all the required
172      * dependencies are satisfied
173      * 
174      */
175     void init() {
176         logger.trace("Init called");
177         connectorsOverUtilized = new ArrayList<NodeConnector>();
178         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
179         notifyThread = new Thread(new TopologyNotify(notifyQ));
180         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
181         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
182         bulkNotifyQ = new LinkedBlockingQueue<String>();
183         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
184             @Override
185             public void run() {
186                 while (true) {
187                     try {
188                         String containerName = bulkNotifyQ.take();
189                         logger.debug("Bulk Notify container:{}", containerName);
190                         TopologyBulkUpdate(containerName);
191                     } catch (InterruptedException e) {
192                         logger.warn("Topology Bulk update thread interrupted");
193                         if (shuttingDown) {
194                             return;
195                         }
196                     }
197                 }
198             }
199         }, "Topology Bulk Update");
200
201         // Initialize node connector tx bit rate poller timer
202         pollTimer = new Timer();
203         txRatePoller = new TimerTask() {
204             @Override
205             public void run() {
206                 pollTxBitRates();
207             }
208         };
209
210         registerWithOSGIConsole();
211     }
212
213     /**
214      * Continuously polls the transmit bit rate for all the node connectors from
215      * statistics manager and trigger the warning notification upward when the
216      * transmit rate is above a threshold which is a percentage of the edge
217      * bandwidth
218      */
219     protected void pollTxBitRates() {
220         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
221                 .get(GlobalConstants.DEFAULT.toString());
222         if (globalContainerEdges == null) {
223             return;
224         }
225
226         for (NodeConnector connector : globalContainerEdges.keySet()) {
227             // Skip if node connector belongs to production switch
228             if (connector.getType().equals(
229                     NodeConnector.NodeConnectorIDType.PRODUCTION)) {
230                 continue;
231             }
232
233             // Get edge for which this node connector is head
234             Pair<Edge, Set<Property>> props = this.edgeMap.get(
235                     GlobalConstants.DEFAULT.toString()).get(connector);
236             // On switch mgr restart the props get reset
237             if (props == null) {
238                 continue;
239             }
240             Set<Property> propSet = props.getRight();
241             if (propSet == null) {
242                 continue;
243             }
244
245             float bw = 0;
246             for (Property prop : propSet) {
247                 if (prop instanceof Bandwidth) {
248                     bw = ((Bandwidth) prop).getValue();
249                     break;
250                 }
251             }
252
253             // Skip if agent did not provide a bandwidth info for the edge
254             if (bw == 0) {
255                 continue;
256             }
257
258             // Compare bandwidth usage
259             Long switchId = (Long) connector.getNode().getID();
260             Short port = (Short) connector.getID();
261             float rate = statsMgr.getTransmitRate(switchId, port);
262             if (rate > bwThresholdFactor * bw) {
263                 if (!connectorsOverUtilized.contains(connector)) {
264                     connectorsOverUtilized.add(connector);
265                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
266                             UpdateType.ADDED));
267                 }
268             } else {
269                 if (connectorsOverUtilized.contains(connector)) {
270                     connectorsOverUtilized.remove(connector);
271                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
272                             UpdateType.REMOVED));
273                 }
274             }
275         }
276
277     }
278
279     /**
280      * Function called by the dependency manager when at least one dependency
281      * become unsatisfied or when the component is shutting down because for
282      * example bundle is being stopped.
283      * 
284      */
285     void destroy() {
286         logger.trace("DESTROY called!");
287         notifyQ = null;
288         notifyThread = null;
289     }
290
291     /**
292      * Function called by dependency manager after "init ()" is called and after
293      * the services provided by the class are registered in the service registry
294      * 
295      */
296     void start() {
297         logger.trace("START called!");
298         notifyThread.start();
299         bwUtilNotifyThread.start();
300         ofPluginTopoBulkUpdate.start();
301         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
302     }
303
304     /**
305      * Function called by the dependency manager before the services exported by
306      * the component are unregistered, this will be followed by a "destroy ()"
307      * calls
308      * 
309      */
310     void stop() {
311         logger.trace("STOP called!");
312         shuttingDown = true;
313         notifyThread.interrupt();
314     }
315
316     void setTopologyServiceShimListener(Map<?, ?> props,
317             ITopologyServiceShimListener s) {
318         if (props == null) {
319             logger.error("Didn't receive the service properties");
320             return;
321         }
322         String containerName = (String) props.get("containerName");
323         if (containerName == null) {
324             logger.error("containerName not supplied");
325             return;
326         }
327         if ((this.topologyServiceShimListeners != null)
328                 && !this.topologyServiceShimListeners
329                         .containsKey(containerName)) {
330             this.topologyServiceShimListeners.put(containerName, s);
331             logger.trace("Added topologyServiceShimListener for container: {}",
332                     containerName);
333         }
334     }
335
336     void unsetTopologyServiceShimListener(Map<?, ?> props,
337             ITopologyServiceShimListener s) {
338         if (props == null) {
339             logger.error("Didn't receive the service properties");
340             return;
341         }
342         String containerName = (String) props.get("containerName");
343         if (containerName == null) {
344             logger.error("containerName not supplied");
345             return;
346         }
347         if ((this.topologyServiceShimListeners != null)
348                 && this.topologyServiceShimListeners.containsKey(containerName)
349                 && this.topologyServiceShimListeners.get(containerName).equals(
350                         s)) {
351             this.topologyServiceShimListeners.remove(containerName);
352             logger.trace("Removed topologyServiceShimListener for container: {}",
353                     containerName);
354         }
355     }
356
357     void setStatisticsManager(IOFStatisticsManager s) {
358         this.statsMgr = s;
359     }
360
361     void unsetStatisticsManager(IOFStatisticsManager s) {
362         if (this.statsMgr == s) {
363             this.statsMgr = null;
364         }
365     }
366
367     private void removeNodeConnector(String container,
368             NodeConnector nodeConnector) {
369         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
370                 .get(container);
371         if (edgePropsMap == null) {
372             return;
373         }
374
375         // Remove edge in one direction
376         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
377         if (edgeProps == null) {
378             return;
379         }
380         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
381
382         // Remove edge in another direction
383         edgeProps = edgePropsMap
384                 .get(edgeProps.getLeft().getHeadNodeConnector());
385         if (edgeProps == null) {
386             return;
387         }
388         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
389     }
390
391     private void notifyEdge(String container, Edge edge, UpdateType type,
392             Set<Property> props) {
393         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
394                 .get(container);
395         NodeConnector src = edge.getTailNodeConnector();
396         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
397                 edge, props);
398
399         switch (type) {
400         case ADDED:
401         case CHANGED:
402             if (edgePropsMap == null) {
403                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
404             } else {
405                 if (edgePropsMap.containsKey(src)
406                         && edgePropsMap.get(src).equals(edgeProps)) {
407                     // Entry already exists. Return here.
408                     return;
409                 }
410             }
411             edgePropsMap.put(src, edgeProps);
412             edgeMap.put(container, edgePropsMap);
413             break;
414         case REMOVED:
415             if (edgePropsMap != null) {
416                 edgePropsMap.remove(src);
417                 if (edgePropsMap.isEmpty()) {
418                     edgeMap.remove(container);
419                 } else {
420                     edgeMap.put(container, edgePropsMap);
421                 }
422             }
423             break;
424         default:
425             logger.debug("notifyEdge: invalid {} for Edge {} in container {}",
426                     type, edge, container);
427             return;
428         }
429
430         notifyQ.add(new NotifyEntry(container, edgeProps, type));
431
432         logger.debug("notifyEdge: {} Edge {} in container {}",
433                 type, edge, container);
434     }
435
436     @Override
437     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
438         if ((edge == null) || (type == null)) {
439             return;
440         }
441
442         // Notify default container
443         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
444
445         // Notify the corresponding containers
446         List<String> containers = getEdgeContainers(edge);
447         if (containers != null) {
448             for (String container : containers) {
449                 notifyEdge(container, edge, type, props);
450             }
451         }
452     }
453
454     /*
455      * Return a list of containers the edge associated with
456      */
457     private List<String> getEdgeContainers(Edge edge) {
458         NodeConnector src = edge.getTailNodeConnector(), dst = edge
459                 .getHeadNodeConnector();
460
461         if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
462             /* Find the common containers for both ends */
463             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
464                     .get(dst), cmnContainers = null;
465             if ((srcContainers != null) && (dstContainers != null)) {
466                 cmnContainers = new ArrayList<String>(srcContainers);
467                 cmnContainers.retainAll(dstContainers);
468             }
469             return cmnContainers;
470         } else {
471             /*
472              * If the neighbor is part of a monitored production network, get
473              * the containers that the edge port belongs to
474              */
475             return this.containerMap.get(dst);
476         }
477     }
478
479     @Override
480     public void tagUpdated(String containerName, Node n, short oldTag,
481             short newTag, UpdateType t) {
482     }
483
484     @Override
485     public void containerFlowUpdated(String containerName,
486             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
487     }
488
489     @Override
490     public void nodeConnectorUpdated(String containerName, NodeConnector p,
491             UpdateType t) {
492         if (this.containerMap == null) {
493             logger.error("containerMap is NULL");
494             return;
495         }
496         List<String> containers = this.containerMap.get(p);
497         if (containers == null) {
498             containers = new CopyOnWriteArrayList<String>();
499         }
500         boolean updateMap = false;
501         switch (t) {
502         case ADDED:
503             if (!containers.contains(containerName)) {
504                 containers.add(containerName);
505                 updateMap = true;
506             }
507             break;
508         case REMOVED:
509             if (containers.contains(containerName)) {
510                 containers.remove(containerName);
511                 updateMap = true;
512                 removeNodeConnector(containerName, p);
513             }
514             break;
515         case CHANGED:
516             break;
517         }
518         if (updateMap) {
519             if (containers.isEmpty()) {
520                 // Do cleanup to reduce memory footprint if no
521                 // elements to be tracked
522                 this.containerMap.remove(p);
523             } else {
524                 this.containerMap.put(p, containers);
525             }
526         }
527     }
528
529     @Override
530     public void containerModeUpdated(UpdateType t) {
531         // do nothing
532     }
533
534     private void registerWithOSGIConsole() {
535         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
536                 .getBundleContext();
537         bundleContext.registerService(CommandProvider.class.getName(), this,
538                 null);
539     }
540
541     @Override
542     public String getHelp() {
543         StringBuffer help = new StringBuffer();
544         help.append("---Topology Service Shim---\n");
545         help.append("\t pem [container]               - Print edgeMap entries");
546         help.append(" for a given container\n");
547         return help.toString();
548     }
549
550     public void _pem(CommandInterpreter ci) {
551         String container = ci.nextArgument();
552         if (container == null) {
553             container = GlobalConstants.DEFAULT.toString();
554         }
555
556         ci.println("Container: " + container);
557         ci.println("                             Edge                                          Bandwidth");
558
559         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
560                 .get(container);
561         if (edgePropsMap == null) {
562             return;
563         }
564         int count = 0;
565         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
566             if (edgeProps == null) {
567                 continue;
568             }
569
570             long bw = 0;
571             for (Property prop : edgeProps.getRight()) {
572                 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
573                     bw = ((Bandwidth) prop).getValue();
574                 }
575             }
576             count++;
577             ci.println(edgeProps.getLeft() + "          " + bw);
578         }
579         ci.println("Total number of Edges: " + count);
580     }
581
582     public void _bwfactor(CommandInterpreter ci) {
583         String factorString = ci.nextArgument();
584         if (factorString == null) {
585             ci.println("Bw threshold: " + this.bwThresholdFactor);
586             ci.println("Insert a non null bw threshold");
587             return;
588         }
589         bwThresholdFactor = Float.parseFloat(factorString);
590         ci.println("New Bw threshold: " + this.bwThresholdFactor);
591     }
592
593     /**
594      * This method will trigger topology updates to be sent toward SAL. SAL then
595      * pushes the updates to ALL the applications that have registered as
596      * listeners for this service. SAL has no way of knowing which application
597      * requested for the refresh.
598      * 
599      * As an example of this case, is stopping and starting the Topology
600      * Manager. When the topology Manager is stopped, and restarted, it will no
601      * longer have the latest topology. Hence, a request is sent here.
602      * 
603      * @param containerName
604      * @return void
605      */
606     @Override
607     public void requestRefresh(String containerName) {
608         // wake up a bulk update thread and exit
609         // the thread will execute the bulkUpdate()
610         bulkNotifyQ.add(containerName);
611     }
612
613     /**
614      * Reading the current topology database, the method will replay all the
615      * edge updates for the ITopologyServiceShimListener instance in the given
616      * container, which will in turn publish them toward SAL.
617      * 
618      * @param containerName
619      */
620     private void TopologyBulkUpdate(String containerName) {
621         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
622
623         logger.debug("Try bulk update for container:{}", containerName);
624         edgePropMap = edgeMap.get(containerName);
625         if (edgePropMap == null) {
626             logger.debug("No edges known for container:{}", containerName);
627             return;
628         }
629         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
630                 .get(containerName);
631         if (topologServiceShimListener == null) {
632             logger.debug("No topology service shim listener for container:{}",
633                     containerName);
634             return;
635         }
636         int i = 0;
637         for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
638             if (edgeProps != null) {
639                 i++;
640                 logger.trace("Add edge {}", edgeProps.getLeft());
641                 topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
642                         UpdateType.ADDED, edgeProps.getRight());
643             }
644         }
645         logger.debug("Sent {} updates", i);
646     }
647
648 }