Merge "Topology Service related enhancement and Style changes. Signed-off-by: Madhava...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.Timer;
17 import java.util.TimerTask;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.LinkedBlockingQueue;
23
24 import org.apache.commons.lang3.tuple.ImmutablePair;
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.eclipse.osgi.framework.console.CommandInterpreter;
27 import org.eclipse.osgi.framework.console.CommandProvider;
28 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
29 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
30 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
31 import org.osgi.framework.BundleContext;
32 import org.osgi.framework.FrameworkUtil;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import org.opendaylight.controller.sal.core.Bandwidth;
37 import org.opendaylight.controller.sal.core.ContainerFlow;
38 import org.opendaylight.controller.sal.core.Edge;
39 import org.opendaylight.controller.sal.core.IContainerListener;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
45 import org.opendaylight.controller.sal.utils.GlobalConstants;
46
47 /**
48  * The class describes a shim layer that relays the topology events from
49  * OpenFlow core to various listeners. The notifications are filtered based on
50  * container configurations.
51  */
52 public class TopologyServiceShim implements IDiscoveryService,
53         IContainerListener, CommandProvider, IRefreshInternalProvider {
54     protected static final Logger logger = LoggerFactory
55             .getLogger(TopologyServiceShim.class);
56     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
57     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
58     private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
59
60     private BlockingQueue<NotifyEntry> notifyQ;
61     private Thread notifyThread;
62     private BlockingQueue<String> bulkNotifyQ;
63     private Thread ofPluginTopoBulkUpdate;
64     private volatile Boolean shuttingDown = false;
65     private IOFStatisticsManager statsMgr;
66     private Timer pollTimer;
67     private TimerTask txRatePoller;
68     private Thread bwUtilNotifyThread;
69     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
70     private List<NodeConnector> connectorsOverUtilized;
71     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
72                                                    // bandwidth
73
74     class NotifyEntry {
75         String container;
76         Pair<Edge, Set<Property>> edgeProps;
77         UpdateType type;
78
79         NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
80                 UpdateType type) {
81             this.container = container;
82             this.edgeProps = edgeProps;
83             this.type = type;
84         }
85     }
86
87     class TopologyNotify implements Runnable {
88         private final BlockingQueue<NotifyEntry> notifyQ;
89
90         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
91             this.notifyQ = notifyQ;
92         }
93
94         public void run() {
95             while (true) {
96                 try {
97                     NotifyEntry entry = notifyQ.take();
98
99                     ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
100                             .get(entry.container);
101                     topologServiceShimListener.edgeUpdate(
102                             entry.edgeProps.getLeft(), entry.type,
103                             entry.edgeProps.getRight());
104
105                     entry = null;
106                 } catch (InterruptedException e1) {
107                     logger.warn("TopologyNotify interrupted", e1.getMessage());
108                     if (shuttingDown) {
109                         return;
110                     }
111                 } catch (Exception e2) {
112                     e2.printStackTrace();
113                 }
114             }
115         }
116     }
117
118     class UtilizationUpdate {
119         NodeConnector connector;
120         UpdateType type;
121
122         UtilizationUpdate(NodeConnector connector, UpdateType type) {
123             this.connector = connector;
124             this.type = type;
125         }
126     }
127
128     class BwUtilizationNotify implements Runnable {
129         private final BlockingQueue<UtilizationUpdate> notifyQ;
130
131         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
132             this.notifyQ = notifyQ;
133         }
134
135         public void run() {
136             while (true) {
137                 try {
138                     UtilizationUpdate update = notifyQ.take();
139                     NodeConnector connector = update.connector;
140                     Set<String> containerList = edgeMap.keySet();
141                     for (String container : containerList) {
142                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
143                                 .get(container);
144                         Edge edge = edgePropsMap.get(connector).getLeft();
145                         if (edge.getTailNodeConnector().equals(connector)) {
146                             ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
147                                     .get(container);
148                             if (update.type == UpdateType.ADDED) {
149                                 topologServiceShimListener
150                                         .edgeOverUtilized(edge);
151                             } else {
152                                 topologServiceShimListener
153                                         .edgeUtilBackToNormal(edge);
154                             }
155                         }
156                     }
157                 } catch (InterruptedException e1) {
158                     logger.warn(
159                             "Edge Bandwidth Utilization Notify Thread interrupted",
160                             e1.getMessage());
161                     if (shuttingDown) {
162                         return;
163                     }
164                 } catch (Exception e2) {
165                     e2.printStackTrace();
166                 }
167             }
168         }
169     }
170
171     /**
172      * Function called by the dependency manager when all the required
173      * dependencies are satisfied
174      * 
175      */
176     void init() {
177         logger.trace("Init called");
178         connectorsOverUtilized = new ArrayList<NodeConnector>();
179         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
180         notifyThread = new Thread(new TopologyNotify(notifyQ));
181         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
182         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
183         bulkNotifyQ = new LinkedBlockingQueue<String>();
184         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
185             @Override
186             public void run() {
187                 while (true) {
188                     try {
189                         String containerName = bulkNotifyQ.take();
190                         logger.debug("Bulk Notify container:{}", containerName);
191                         TopologyBulkUpdate(containerName);
192                     } catch (InterruptedException e) {
193                         logger.warn("Topology Bulk update thread interrupted");
194                         if (shuttingDown) {
195                             return;
196                         }
197                     }
198                 }
199             }
200         }, "Topology Bulk Update");
201
202         // Initialize node connector tx bit rate poller timer
203         pollTimer = new Timer();
204         txRatePoller = new TimerTask() {
205             @Override
206             public void run() {
207                 pollTxBitRates();
208             }
209         };
210
211         registerWithOSGIConsole();
212     }
213
214     /**
215      * Continuously polls the transmit bit rate for all the node connectors from
216      * statistics manager and trigger the warning notification upward when the
217      * transmit rate is above a threshold which is a percentage of the edge
218      * bandwidth
219      */
220     protected void pollTxBitRates() {
221         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
222                 .get(GlobalConstants.DEFAULT.toString());
223         if (globalContainerEdges == null) {
224             return;
225         }
226
227         for (NodeConnector connector : globalContainerEdges.keySet()) {
228             // Skip if node connector belongs to production switch
229             if (connector.getType().equals(
230                     NodeConnector.NodeConnectorIDType.PRODUCTION)) {
231                 continue;
232             }
233
234             // Get edge for which this node connector is head
235             Pair<Edge, Set<Property>> props = this.edgeMap.get(
236                     GlobalConstants.DEFAULT.toString()).get(connector);
237             // On switch mgr restart the props get reset
238             if (props == null) {
239                 continue;
240             }
241             Set<Property> propSet = props.getRight();
242             if (propSet == null) {
243                 continue;
244             }
245
246             float bw = 0;
247             for (Property prop : propSet) {
248                 if (prop instanceof Bandwidth) {
249                     bw = ((Bandwidth) prop).getValue();
250                     break;
251                 }
252             }
253
254             // Skip if agent did not provide a bandwidth info for the edge
255             if (bw == 0) {
256                 continue;
257             }
258
259             // Compare bandwidth usage
260             Long switchId = (Long) connector.getNode().getID();
261             Short port = (Short) connector.getID();
262             float rate = statsMgr.getTransmitRate(switchId, port);
263             if (rate > bwThresholdFactor * bw) {
264                 if (!connectorsOverUtilized.contains(connector)) {
265                     connectorsOverUtilized.add(connector);
266                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
267                             UpdateType.ADDED));
268                 }
269             } else {
270                 if (connectorsOverUtilized.contains(connector)) {
271                     connectorsOverUtilized.remove(connector);
272                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
273                             UpdateType.REMOVED));
274                 }
275             }
276         }
277
278     }
279
280     /**
281      * Function called by the dependency manager when at least one dependency
282      * become unsatisfied or when the component is shutting down because for
283      * example bundle is being stopped.
284      * 
285      */
286     void destroy() {
287         logger.trace("DESTROY called!");
288         notifyQ = null;
289         notifyThread = null;
290     }
291
292     /**
293      * Function called by dependency manager after "init ()" is called and after
294      * the services provided by the class are registered in the service registry
295      * 
296      */
297     void start() {
298         logger.trace("START called!");
299         notifyThread.start();
300         bwUtilNotifyThread.start();
301         ofPluginTopoBulkUpdate.start();
302         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
303     }
304
305     /**
306      * Function called by the dependency manager before the services exported by
307      * the component are unregistered, this will be followed by a "destroy ()"
308      * calls
309      * 
310      */
311     void stop() {
312         logger.trace("STOP called!");
313         shuttingDown = true;
314         notifyThread.interrupt();
315     }
316
317     void setTopologyServiceShimListener(Map<?, ?> props,
318             ITopologyServiceShimListener s) {
319         if (props == null) {
320             logger.error("Didn't receive the service properties");
321             return;
322         }
323         String containerName = (String) props.get("containerName");
324         if (containerName == null) {
325             logger.error("containerName not supplied");
326             return;
327         }
328         if ((this.topologyServiceShimListeners != null)
329                 && !this.topologyServiceShimListeners
330                         .containsKey(containerName)) {
331             this.topologyServiceShimListeners.put(containerName, s);
332             logger.trace("Added topologyServiceShimListener for container:"
333                     + containerName);
334         }
335     }
336
337     void unsetTopologyServiceShimListener(Map<?, ?> props,
338             ITopologyServiceShimListener s) {
339         if (props == null) {
340             logger.error("Didn't receive the service properties");
341             return;
342         }
343         String containerName = (String) props.get("containerName");
344         if (containerName == null) {
345             logger.error("containerName not supplied");
346             return;
347         }
348         if ((this.topologyServiceShimListeners != null)
349                 && this.topologyServiceShimListeners.containsKey(containerName)
350                 && this.topologyServiceShimListeners.get(containerName).equals(
351                         s)) {
352             this.topologyServiceShimListeners.remove(containerName);
353             logger.trace("Removed topologyServiceShimListener for container: "
354                     + containerName);
355         }
356     }
357
358     void setStatisticsManager(IOFStatisticsManager s) {
359         this.statsMgr = s;
360     }
361
362     void unsetStatisticsManager(IOFStatisticsManager s) {
363         if (this.statsMgr == s) {
364             this.statsMgr = null;
365         }
366     }
367
368     private void removeNodeConnector(String container,
369             NodeConnector nodeConnector) {
370         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
371                 .get(container);
372         if (edgePropsMap == null) {
373             return;
374         }
375
376         // Remove edge in one direction
377         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
378         if (edgeProps == null) {
379             return;
380         }
381         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
382
383         // Remove edge in another direction
384         edgeProps = edgePropsMap
385                 .get(edgeProps.getLeft().getHeadNodeConnector());
386         if (edgeProps == null) {
387             return;
388         }
389         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
390     }
391
392     private void notifyEdge(String container, Edge edge, UpdateType type,
393             Set<Property> props) {
394         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
395                 .get(container);
396         NodeConnector src = edge.getTailNodeConnector();
397         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
398                 edge, props);
399
400         switch (type) {
401         case ADDED:
402         case CHANGED:
403             if (edgePropsMap == null) {
404                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
405             } else {
406                 if (edgePropsMap.containsKey(src)
407                         && edgePropsMap.get(src).equals(edgeProps)) {
408                     // Entry already exists. Return here.
409                     return;
410                 }
411             }
412             edgePropsMap.put(src, edgeProps);
413             edgeMap.put(container, edgePropsMap);
414             break;
415         case REMOVED:
416             if (edgePropsMap != null) {
417                 edgePropsMap.remove(src);
418                 if (edgePropsMap.isEmpty()) {
419                     edgeMap.remove(container);
420                 } else {
421                     edgeMap.put(container, edgePropsMap);
422                 }
423             }
424             break;
425         default:
426             logger.debug("Invalid " + type + " Edge " + edge
427                     + " in container {}", container);
428             return;
429         }
430
431         notifyQ.add(new NotifyEntry(container, edgeProps, type));
432
433         logger.trace(type + " Edge " + edge + " in container {}", container);
434     }
435
436     @Override
437     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
438         if ((edge == null) || (type == null)) {
439             return;
440         }
441
442         // Notify default container
443         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
444
445         // Notify the corresponding containers
446         List<String> containers = getEdgeContainers(edge);
447         if (containers != null) {
448             for (String container : containers) {
449                 notifyEdge(container, edge, type, props);
450             }
451         }
452     }
453
454     /*
455      * Return a list of containers the edge associated with
456      */
457     private List<String> getEdgeContainers(Edge edge) {
458         NodeConnector src = edge.getTailNodeConnector(), dst = edge
459                 .getHeadNodeConnector();
460
461         if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
462             /* Find the common containers for both ends */
463             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
464                     .get(dst), cmnContainers = null;
465             if ((srcContainers != null) && (dstContainers != null)) {
466                 cmnContainers = new ArrayList<String>(srcContainers);
467                 cmnContainers.retainAll(dstContainers);
468             }
469             return cmnContainers;
470         } else {
471             /*
472              * If the neighbor is part of a monitored production network, get
473              * the containers that the edge port belongs to
474              */
475             return this.containerMap.get(dst);
476         }
477     }
478
479     @Override
480     public void tagUpdated(String containerName, Node n, short oldTag,
481             short newTag, UpdateType t) {
482     }
483
484     @Override
485     public void containerFlowUpdated(String containerName,
486             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
487     }
488
489     @Override
490     public void nodeConnectorUpdated(String containerName, NodeConnector p,
491             UpdateType t) {
492         if (this.containerMap == null) {
493             logger.error("containerMap is NULL");
494             return;
495         }
496         List<String> containers = this.containerMap.get(p);
497         if (containers == null) {
498             containers = new CopyOnWriteArrayList<String>();
499         }
500         boolean updateMap = false;
501         switch (t) {
502         case ADDED:
503             if (!containers.contains(containerName)) {
504                 containers.add(containerName);
505                 updateMap = true;
506             }
507             break;
508         case REMOVED:
509             if (containers.contains(containerName)) {
510                 containers.remove(containerName);
511                 updateMap = true;
512                 removeNodeConnector(containerName, p);
513             }
514             break;
515         case CHANGED:
516             break;
517         }
518         if (updateMap) {
519             if (containers.isEmpty()) {
520                 // Do cleanup to reduce memory footprint if no
521                 // elements to be tracked
522                 this.containerMap.remove(p);
523             } else {
524                 this.containerMap.put(p, containers);
525             }
526         }
527     }
528
529     @Override
530     public void containerModeUpdated(UpdateType t) {
531         // do nothing
532     }
533
534     private void registerWithOSGIConsole() {
535         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
536                 .getBundleContext();
537         bundleContext.registerService(CommandProvider.class.getName(), this,
538                 null);
539     }
540
541     @Override
542     public String getHelp() {
543         StringBuffer help = new StringBuffer();
544         help.append("---Topology Service Shim---\n");
545         help.append("\t pem [container]               - Print edgeMap entries");
546         help.append(" for a given container\n");
547         return help.toString();
548     }
549
550     public void _pem(CommandInterpreter ci) {
551         String container = ci.nextArgument();
552         if (container == null) {
553             container = GlobalConstants.DEFAULT.toString();
554         }
555
556         ci.println("Container: " + container);
557         ci.println("                             Edge                                          Bandwidth");
558
559         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
560                 .get(container);
561         if (edgePropsMap == null) {
562             return;
563         }
564         int count = 0;
565         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
566             if (edgeProps == null) {
567                 continue;
568             }
569
570             long bw = 0;
571             for (Property prop : edgeProps.getRight()) {
572                 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
573                     bw = ((Bandwidth) prop).getValue();
574                 }
575             }
576             count++;
577             ci.println(edgeProps.getLeft() + "          " + bw);
578         }
579         ci.println("Total number of Edges: " + count);
580     }
581
582     public void _bwfactor(CommandInterpreter ci) {
583         String factorString = ci.nextArgument();
584         if (factorString == null) {
585             ci.println("Bw threshold: " + this.bwThresholdFactor);
586             ci.println("Insert a non null bw threshold");
587             return;
588         }
589         bwThresholdFactor = Float.parseFloat(factorString);
590         ci.println("New Bw threshold: " + this.bwThresholdFactor);
591     }
592
593     /**
594      * This method will trigger topology updates to be sent toward SAL. SAL then
595      * pushes the updates to ALL the applications that have registered as
596      * listeners for this service. SAL has no way of knowing which application
597      * requested for the refresh.
598      * 
599      * As an example of this case, is stopping and starting the Topology
600      * Manager. When the topology Manager is stopped, and restarted, it will no
601      * longer have the latest topology. Hence, a request is sent here.
602      * 
603      * @param containerName
604      * @return void
605      */
606     @Override
607     public void requestRefresh(String containerName) {
608         // wake up a bulk update thread and exit
609         // the thread will execute the bulkUpdate()
610         bulkNotifyQ.add(containerName);
611     }
612
613     /**
614      * Reading the current topology database, the method will replay all the
615      * edge updates for the ITopologyServiceShimListener instance in the given
616      * container, which will in turn publish them toward SAL.
617      * 
618      * @param containerName
619      */
620     private void TopologyBulkUpdate(String containerName) {
621         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
622
623         logger.debug("Try bulk update for container:{}", containerName);
624         edgePropMap = edgeMap.get(containerName);
625         if (edgePropMap == null) {
626             logger.debug("No edges known for container:{}", containerName);
627             return;
628         }
629         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
630                 .get(containerName);
631         if (topologServiceShimListener == null) {
632             logger.debug("No topology service shim listener for container:{}",
633                     containerName);
634             return;
635         }
636         int i = 0;
637         for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
638             if (edgeProps != null) {
639                 i++;
640                 logger.trace("Add edge {}", edgeProps.getLeft());
641                 topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
642                         UpdateType.ADDED, edgeProps.getRight());
643             }
644         }
645         logger.debug("Sent {} updates", i);
646     }
647
648 }