Refactor frontend JS
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.Timer;
18 import java.util.TimerTask;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.LinkedBlockingQueue;
24
25 import org.apache.commons.lang3.tuple.ImmutablePair;
26 import org.apache.commons.lang3.tuple.Pair;
27 import org.eclipse.osgi.framework.console.CommandInterpreter;
28 import org.eclipse.osgi.framework.console.CommandProvider;
29 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
30 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
32 import org.osgi.framework.BundleContext;
33 import org.osgi.framework.FrameworkUtil;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import org.opendaylight.controller.sal.core.Bandwidth;
38 import org.opendaylight.controller.sal.core.ContainerFlow;
39 import org.opendaylight.controller.sal.core.Edge;
40 import org.opendaylight.controller.sal.core.IContainerListener;
41 import org.opendaylight.controller.sal.core.Node;
42 import org.opendaylight.controller.sal.core.NodeConnector;
43 import org.opendaylight.controller.sal.core.Property;
44 import org.opendaylight.controller.sal.core.UpdateType;
45 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
46 import org.opendaylight.controller.sal.utils.GlobalConstants;
47
48 /**
49  * The class describes a shim layer that relays the topology events from OpenFlow
50  * core to various listeners. The notifications are filtered based on container
51  * configurations.
52  */
53 public class TopologyServiceShim implements IDiscoveryService,
54         IContainerListener, CommandProvider, IRefreshInternalProvider {
55     protected static final Logger logger = LoggerFactory
56             .getLogger(TopologyServiceShim.class);
57     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
58     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
59     private ConcurrentMap<String, Map<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, Map<NodeConnector, Pair<Edge, Set<Property>>>>();
60
61     private BlockingQueue<NotifyEntry> notifyQ;
62     private Thread notifyThread;
63     private BlockingQueue<String> bulkNotifyQ;
64     private Thread ofPluginTopoBulkUpdate;
65     private volatile Boolean shuttingDown = false;
66     private IOFStatisticsManager statsMgr;
67     private Timer pollTimer;
68     private TimerTask txRatePoller;
69     private Thread bwUtilNotifyThread;
70     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
71     private List<NodeConnector> connectorsOverUtilized;
72     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link bandwidth
73
74     class NotifyEntry {
75         String container;
76         Pair<Edge, Set<Property>> edgeProps;
77         UpdateType type;
78
79         NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
80                 UpdateType type) {
81             this.container = container;
82             this.edgeProps = edgeProps;
83             this.type = type;
84         }
85     }
86
87     class TopologyNotify implements Runnable {
88         private final BlockingQueue<NotifyEntry> notifyQ;
89
90         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
91             this.notifyQ = notifyQ;
92         }
93
94         public void run() {
95             while (true) {
96                 try {
97                     NotifyEntry entry = notifyQ.take();
98
99                     ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
100                             .get(entry.container);
101                     topologServiceShimListener.edgeUpdate(entry.edgeProps
102                             .getLeft(), entry.type, entry.edgeProps.getRight());
103
104                     entry = null;
105                 } catch (InterruptedException e1) {
106                     logger.warn("TopologyNotify interrupted", e1.getMessage());
107                     if (shuttingDown) {
108                         return;
109                     }
110                 } catch (Exception e2) {
111                     e2.printStackTrace();
112                 }
113             }
114         }
115     }
116
117     class UtilizationUpdate {
118         NodeConnector connector;
119         UpdateType type;
120
121         UtilizationUpdate(NodeConnector connector, UpdateType type) {
122             this.connector = connector;
123             this.type = type;
124         }
125     }
126
127     class BwUtilizationNotify implements Runnable {
128         private final BlockingQueue<UtilizationUpdate> notifyQ;
129
130         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
131             this.notifyQ = notifyQ;
132         }
133
134         public void run() {
135             while (true) {
136                 try {
137                     UtilizationUpdate update = notifyQ.take();
138                     NodeConnector connector = update.connector;
139                     Set<String> containerList = edgeMap.keySet();//.get(connector);
140                     for (String container : containerList) {
141                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
142                                 .get(container);
143                         Edge edge = edgePropsMap.get(connector).getLeft();
144                         if (edge.getTailNodeConnector().equals(connector)) {
145                             ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
146                                     .get(container);
147                             if (update.type == UpdateType.ADDED) {
148                                 topologServiceShimListener
149                                         .edgeOverUtilized(edge);
150                             } else {
151                                 topologServiceShimListener
152                                         .edgeUtilBackToNormal(edge);
153                             }
154                         }
155                     }
156                 } catch (InterruptedException e1) {
157                     logger
158                             .warn(
159                                     "Edge Bandwidth Utilization Notify Thread interrupted",
160                                     e1.getMessage());
161                     if (shuttingDown) {
162                         return;
163                     }
164                 } catch (Exception e2) {
165                     e2.printStackTrace();
166                 }
167             }
168         }
169     }
170
171     /**
172      * Function called by the dependency manager when all the required
173      * dependencies are satisfied
174      *
175      */
176     void init() {
177         logger.trace("Init called");
178         connectorsOverUtilized = new ArrayList<NodeConnector>();
179         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
180         notifyThread = new Thread(new TopologyNotify(notifyQ));
181         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
182         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
183         bulkNotifyQ = new LinkedBlockingQueue<String>();
184         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
185             @Override
186             public void run() {
187                 while (true) {
188                     try {
189                         String containerName = bulkNotifyQ.take();
190                         logger.debug("Bulk Notify container:{}", containerName);
191                         TopologyBulkUpdate(containerName);
192                     } catch (InterruptedException e) {
193                         logger.warn("Topology Bulk update thread interrupted");
194                         if (shuttingDown) {
195                             return;
196                         }
197                     }
198                 }
199             }
200         }, "Topology Bulk Update");
201
202         // Initialize node connector tx bit rate poller timer
203         pollTimer = new Timer();
204         txRatePoller = new TimerTask() {
205             @Override
206             public void run() {
207                 pollTxBitRates();
208             }
209         };
210
211         registerWithOSGIConsole();
212     }
213
214     /**
215      * Continuously polls the transmit bit rate for all the node connectors
216      * from statistics manager and trigger the warning notification upward
217      * when the transmit rate is above a threshold which is a percentage of
218      * the edge bandwidth
219      */
220     protected void pollTxBitRates() {
221         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
222                 .get(GlobalConstants.DEFAULT.toString());
223         if (globalContainerEdges == null) {
224             return;
225         }
226
227         for (NodeConnector connector : globalContainerEdges.keySet()) {
228             // Skip if node connector belongs to production switch
229             if (connector.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
230                 continue;
231             }
232
233             // Get edge for which this node connector is head
234             Pair<Edge, Set<Property>> props = this.edgeMap.get(
235                     GlobalConstants.DEFAULT.toString()).get(connector);
236             // On switch mgr restart the props get reset
237             if (props == null) {
238                 continue;
239             }
240             Set<Property> propSet = props.getRight();
241             if (propSet == null) {
242                 continue;
243             }
244
245             float bw = 0;
246             for (Property prop : propSet) {
247                 if (prop instanceof Bandwidth) {
248                     bw = ((Bandwidth) prop).getValue();
249                     break;
250                 }
251             }
252
253             // Skip if agent did not provide a bandwidth info for the edge
254             if (bw == 0) {
255                 continue;
256             }
257
258             // Compare bandwidth usage
259             Long switchId = (Long) connector.getNode().getID();
260             Short port = (Short) connector.getID();
261             float rate = statsMgr.getTransmitRate(switchId, port);
262             if (rate > bwThresholdFactor * bw) {
263                 if (!connectorsOverUtilized.contains(connector)) {
264                     connectorsOverUtilized.add(connector);
265                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
266                             UpdateType.ADDED));
267                 }
268             } else {
269                 if (connectorsOverUtilized.contains(connector)) {
270                     connectorsOverUtilized.remove(connector);
271                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
272                             UpdateType.REMOVED));
273                 }
274             }
275         }
276
277     }
278
279     /**
280      * Function called by the dependency manager when at least one
281      * dependency become unsatisfied or when the component is shutting
282      * down because for example bundle is being stopped.
283      *
284      */
285     void destroy() {
286         logger.trace("DESTROY called!");
287         notifyQ = null;
288         notifyThread = null;
289     }
290
291     /**
292      * Function called by dependency manager after "init ()" is called
293      * and after the services provided by the class are registered in
294      * the service registry
295      *
296      */
297     void start() {
298         logger.trace("START called!");
299         notifyThread.start();
300         bwUtilNotifyThread.start();
301         ofPluginTopoBulkUpdate.start();
302         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
303     }
304
305     /**
306      * Function called by the dependency manager before the services
307      * exported by the component are unregistered, this will be
308      * followed by a "destroy ()" calls
309      *
310      */
311     void stop() {
312         logger.trace("STOP called!");
313         shuttingDown = true;
314         notifyThread.interrupt();
315     }
316
317     void setTopologyServiceShimListener(Map<?, ?> props,
318             ITopologyServiceShimListener s) {
319         if (props == null) {
320             logger.error("Didn't receive the service properties");
321             return;
322         }
323         String containerName = (String) props.get("containerName");
324         if (containerName == null) {
325             logger.error("containerName not supplied");
326             return;
327         }
328         if ((this.topologyServiceShimListeners != null)
329                 && !this.topologyServiceShimListeners
330                         .containsKey(containerName)) {
331             this.topologyServiceShimListeners.put(containerName, s);
332             logger.trace("Added topologyServiceShimListener for container:"
333                     + containerName);
334         }
335     }
336
337     void unsetTopologyServiceShimListener(Map<?, ?> props,
338             ITopologyServiceShimListener s) {
339         if (props == null) {
340             logger.error("Didn't receive the service properties");
341             return;
342         }
343         String containerName = (String) props.get("containerName");
344         if (containerName == null) {
345             logger.error("containerName not supplied");
346             return;
347         }
348         if ((this.topologyServiceShimListeners != null)
349                 && this.topologyServiceShimListeners
350                 .containsKey(containerName)
351                 && this.topologyServiceShimListeners
352                 .get(containerName).equals(s)
353                 ) {
354             this.topologyServiceShimListeners.remove(containerName);
355             logger.trace("Removed topologyServiceShimListener for container: "
356                     + containerName);
357         }
358     }
359
360     void setStatisticsManager(IOFStatisticsManager s) {
361         this.statsMgr = s;
362     }
363
364     void unsetStatisticsManager(IOFStatisticsManager s) {
365         if (this.statsMgr == s) {
366             this.statsMgr = null;
367         }
368     }
369
370     private void removeNodeConnector(String container,
371             NodeConnector nodeConnector) {
372         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
373                 .get(container);
374         if (edgePropsMap == null) {
375             return;
376         }
377
378         // Remove edge in one direction
379         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
380         if (edgeProps == null) {
381             return;
382         }
383         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
384
385         // Remove edge in another direction
386         edgeProps = edgePropsMap
387                 .get(edgeProps.getLeft().getHeadNodeConnector());
388         if (edgeProps == null) {
389             return;
390         }
391         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
392     }
393
394     private void notifyEdge(String container, Edge edge, UpdateType type,
395             Set<Property> props) {
396         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
397                 .get(container);
398         NodeConnector src = edge.getTailNodeConnector();
399         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
400                 edge, props);
401
402         switch (type) {
403         case ADDED:
404         case CHANGED:
405             if (edgePropsMap == null) {
406                 edgePropsMap = new HashMap<NodeConnector, Pair<Edge, Set<Property>>>();
407             } else {
408                 if (edgePropsMap.containsKey(src)
409                         && edgePropsMap.get(src).equals(edgeProps)) {
410                     // Entry already exists. Return here.
411                     return;
412                 }
413             }
414             edgePropsMap.put(src, edgeProps);
415             edgeMap.put(container, edgePropsMap);
416             break;
417         case REMOVED:
418             if (edgePropsMap != null) {
419                 edgePropsMap.remove(src);
420                 if (edgePropsMap.isEmpty()) {
421                     edgeMap.remove(container);
422                 } else {
423                     edgeMap.put(container, edgePropsMap);
424                 }
425             }
426             break;
427         default:
428             logger.debug("Invalid " + type + " Edge " + edge
429                     + " in container {}", container);
430             return;
431         }
432
433         notifyQ.add(new NotifyEntry(container, edgeProps, type));
434
435         logger.trace(type + " Edge " + edge + " in container {}", container);
436     }
437
438     @Override
439     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
440         if ((edge == null) || (type == null)) {
441             return;
442         }
443
444         // Notify default container
445         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
446
447         // Notify the corresponding containers
448         List<String> containers = getEdgeContainers(edge);
449         if (containers != null) {
450             for (String container : containers) {
451                 notifyEdge(container, edge, type, props);
452             }
453         }
454     }
455
456     /*
457      * Return a list of containers the edge associated with
458      */
459     private List<String> getEdgeContainers(Edge edge) {
460         NodeConnector src = edge.getTailNodeConnector(), dst = edge
461                 .getHeadNodeConnector();
462
463         if (!src.getType().equals(
464                 NodeConnector.NodeConnectorIDType.PRODUCTION)) {
465             /* Find the common containers for both ends */
466             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
467                     .get(dst), cmnContainers = null;
468             if ((srcContainers != null) && (dstContainers != null)) {
469                 cmnContainers = new ArrayList<String>(srcContainers);
470                 cmnContainers.retainAll(dstContainers);
471             }
472             return cmnContainers;
473         } else {
474             /*
475              * If the neighbor is part of a monitored production network, get
476              * the containers that the edge port belongs to
477              */
478             return this.containerMap.get(dst);
479         }
480     }
481
482     @Override
483     public void tagUpdated(String containerName, Node n, short oldTag,
484             short newTag, UpdateType t) {
485     }
486
487     @Override
488     public void containerFlowUpdated(String containerName,
489             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
490     }
491
492     @Override
493     public void nodeConnectorUpdated(String containerName, NodeConnector p,
494             UpdateType t) {
495         if (this.containerMap == null) {
496             logger.error("containerMap is NULL");
497             return;
498         }
499         List<String> containers = this.containerMap.get(p);
500         if (containers == null) {
501             containers = new CopyOnWriteArrayList<String>();
502         }
503         boolean updateMap = false;
504         switch (t) {
505         case ADDED:
506             if (!containers.contains(containerName)) {
507                 containers.add(containerName);
508                 updateMap = true;
509             }
510             break;
511         case REMOVED:
512             if (containers.contains(containerName)) {
513                 containers.remove(containerName);
514                 updateMap = true;
515                 removeNodeConnector(containerName, p);
516             }
517             break;
518         case CHANGED:
519             break;
520         }
521         if (updateMap) {
522             if (containers.isEmpty()) {
523                 // Do cleanup to reduce memory footprint if no
524                 // elements to be tracked
525                 this.containerMap.remove(p);
526             } else {
527                 this.containerMap.put(p, containers);
528             }
529         }
530     }
531
532     @Override
533     public void containerModeUpdated(UpdateType t) {
534         // do nothing
535     }
536
537     private void registerWithOSGIConsole() {
538         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
539                 .getBundleContext();
540         bundleContext.registerService(CommandProvider.class.getName(), this,
541                 null);
542     }
543
544     @Override
545     public String getHelp() {
546         StringBuffer help = new StringBuffer();
547         help.append("---Topology Service Shim---\n");
548         help
549                 .append("\t pem [container]               - Print edgeMap entries for a given container\n");
550         return help.toString();
551     }
552
553     public void _pem(CommandInterpreter ci) {
554         String container = ci.nextArgument();
555         if (container == null) {
556             container = GlobalConstants.DEFAULT.toString();
557         }
558
559         ci.println("Container: " + container);
560         ci.println("                             Edge                                          Bandwidth");
561
562         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
563                 .get(container);
564         if (edgePropsMap == null) {
565             return;
566         }
567         int count = 0;
568         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
569             if (edgeProps == null) {
570                 continue;
571             }
572
573             long bw = 0;
574             for (Property prop : edgeProps.getRight()) {
575                 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
576                     bw = ((Bandwidth) prop).getValue();
577                 }
578             }
579             count++;
580             ci.println(edgeProps.getLeft() + "          " + bw);
581         }
582         ci.println("Total number of Edges: " + count);
583     }
584
585     public void _bwfactor(CommandInterpreter ci) {
586         String factorString = ci.nextArgument();
587         if (factorString == null) {
588             ci.println("Bw threshold: " + this.bwThresholdFactor);
589             ci.println("Insert a non null bw threshold");
590             return;
591         }
592         bwThresholdFactor = Float.parseFloat(factorString);
593         ci.println("New Bw threshold: " + this.bwThresholdFactor);
594     }
595
596     /**
597      * This method will trigger topology updates to be sent
598      * toward SAL.  SAL then pushes the updates to ALL the applications
599      * that have registered as listeners for this service.  SAL has no
600      * way of knowing which application requested for the refresh.
601      *
602      * As an example of this case, is stopping and starting the
603      * Topology Manager.  When the topology Manager is stopped,
604      * and restarted, it will no longer have the latest topology.
605      * Hence, a request is sent here.
606      *
607      * @param containerName
608      * @return void
609      */
610     @Override
611     public void requestRefresh(String containerName) {
612         // wake up a bulk update thread and exit
613         // the thread will execute the bulkUpdate()
614         bulkNotifyQ.add(containerName);
615     }
616
617     /**
618      * Reading the current topology database, the method will replay
619      * all the edge updates for the ITopologyServiceShimListener instance
620      * in the given container, which will in turn publish them toward SAL.
621      * @param containerName
622      */
623     private void TopologyBulkUpdate(String containerName) {
624         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
625
626         logger.debug("Try bulk update for container:{}", containerName);
627         edgePropMap = edgeMap.get(containerName);
628         if (edgePropMap == null) {
629             logger.debug("No edges known for container:{}", containerName);
630             return;
631         }
632         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
633                 .get(containerName);
634         if (topologServiceShimListener == null) {
635             logger.debug("No topology service shim listener for container:{}",
636                     containerName);
637             return;
638         }
639         int i = 0;
640         for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
641             if (edgeProps != null) {
642                 i++;
643                 logger.trace("Add edge {}", edgeProps.getLeft());
644                 topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
645                         UpdateType.ADDED, edgeProps.getRight());
646             }
647         }
648         logger.debug("Sent {} updates", i);
649     }
650
651 }