OpenDaylight Controller functional modules.
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.Timer;
18 import java.util.TimerTask;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.LinkedBlockingQueue;
24
25 import org.apache.commons.lang3.tuple.ImmutablePair;
26 import org.apache.commons.lang3.tuple.Pair;
27 import org.eclipse.osgi.framework.console.CommandInterpreter;
28 import org.eclipse.osgi.framework.console.CommandProvider;
29 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
30 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
32 import org.osgi.framework.BundleContext;
33 import org.osgi.framework.FrameworkUtil;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import org.opendaylight.controller.sal.core.Bandwidth;
38 import org.opendaylight.controller.sal.core.ContainerFlow;
39 import org.opendaylight.controller.sal.core.Edge;
40 import org.opendaylight.controller.sal.core.IContainerListener;
41 import org.opendaylight.controller.sal.core.Node;
42 import org.opendaylight.controller.sal.core.NodeConnector;
43 import org.opendaylight.controller.sal.core.Property;
44 import org.opendaylight.controller.sal.core.UpdateType;
45 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
46 import org.opendaylight.controller.sal.utils.GlobalConstants;
47
48 /**
49  * The class describes a shim layer that relays the topology events from OpenFlow
50  * core to various listeners. The notifications are filtered based on container
51  * configurations.
52  */
53 public class TopologyServiceShim implements IDiscoveryService,
54         IContainerListener, CommandProvider, IRefreshInternalProvider {
55     protected static final Logger logger = LoggerFactory
56             .getLogger(TopologyServiceShim.class);
57     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
58     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
59     private ConcurrentMap<String, Map<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, Map<NodeConnector, Pair<Edge, Set<Property>>>>();
60
61     private BlockingQueue<NotifyEntry> notifyQ;
62     private Thread notifyThread;
63     private BlockingQueue<String> bulkNotifyQ;
64     private Thread ofPluginTopoBulkUpdate;
65     private volatile Boolean shuttingDown = false;
66     private IOFStatisticsManager statsMgr;
67     private Timer pollTimer;
68     private TimerTask txRatePoller;
69     private Thread bwUtilNotifyThread;
70     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
71     private List<NodeConnector> connectorsOverUtilized;
72     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link bandwidth
73
74     class NotifyEntry {
75         String container;
76         Pair<Edge, Set<Property>> edgeProps;
77         UpdateType type;
78
79         NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
80                 UpdateType type) {
81             this.container = container;
82             this.edgeProps = edgeProps;
83             this.type = type;
84         }
85     }
86
87     class TopologyNotify implements Runnable {
88         private final BlockingQueue<NotifyEntry> notifyQ;
89
90         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
91             this.notifyQ = notifyQ;
92         }
93
94         public void run() {
95             while (true) {
96                 try {
97                     NotifyEntry entry = notifyQ.take();
98
99                     ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
100                             .get(entry.container);
101                     topologServiceShimListener.edgeUpdate(entry.edgeProps
102                             .getLeft(), entry.type, entry.edgeProps.getRight());
103
104                     entry = null;
105                 } catch (InterruptedException e1) {
106                     logger.warn("TopologyNotify interrupted", e1.getMessage());
107                     if (shuttingDown) {
108                         return;
109                     }
110                 } catch (Exception e2) {
111                     e2.printStackTrace();
112                 }
113             }
114         }
115     }
116
117     class UtilizationUpdate {
118         NodeConnector connector;
119         UpdateType type;
120
121         UtilizationUpdate(NodeConnector connector, UpdateType type) {
122             this.connector = connector;
123             this.type = type;
124         }
125     }
126
127     class BwUtilizationNotify implements Runnable {
128         private final BlockingQueue<UtilizationUpdate> notifyQ;
129
130         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
131             this.notifyQ = notifyQ;
132         }
133
134         public void run() {
135             while (true) {
136                 try {
137                     UtilizationUpdate update = notifyQ.take();
138                     NodeConnector connector = update.connector;
139                     Set<String> containerList = edgeMap.keySet();//.get(connector);
140                     for (String container : containerList) {
141                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
142                                 .get(container);
143                         Edge edge = edgePropsMap.get(connector).getLeft();
144                         if (edge.getTailNodeConnector().equals(connector)) {
145                             ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
146                                     .get(container);
147                             if (update.type == UpdateType.ADDED) {
148                                 topologServiceShimListener
149                                         .edgeOverUtilized(edge);
150                             } else {
151                                 topologServiceShimListener
152                                         .edgeUtilBackToNormal(edge);
153                             }
154                         }
155                     }
156                 } catch (InterruptedException e1) {
157                     logger
158                             .warn(
159                                     "Edge Bandwidth Utilization Notify Thread interrupted",
160                                     e1.getMessage());
161                     if (shuttingDown) {
162                         return;
163                     }
164                 } catch (Exception e2) {
165                     e2.printStackTrace();
166                 }
167             }
168         }
169     }
170
171     /**
172      * Function called by the dependency manager when all the required
173      * dependencies are satisfied
174      *
175      */
176     void init() {
177         logger.trace("Init called");
178         connectorsOverUtilized = new ArrayList<NodeConnector>();
179         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
180         notifyThread = new Thread(new TopologyNotify(notifyQ));
181         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
182         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
183         bulkNotifyQ = new LinkedBlockingQueue<String>();
184         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
185             @Override
186             public void run() {
187                 while (true) {
188                     try {
189                         String containerName = bulkNotifyQ.take();
190                         logger.debug("Bulk Notify container:{}", containerName);
191                         TopologyBulkUpdate(containerName);
192                     } catch (InterruptedException e) {
193                         logger.warn("Topology Bulk update thread interrupted");
194                         if (shuttingDown) {
195                             return;
196                         }
197                     }
198                 }
199             }
200         }, "Topology Bulk Update");
201
202         // Initialize node connector tx bit rate poller timer
203         pollTimer = new Timer();
204         txRatePoller = new TimerTask() {
205             @Override
206             public void run() {
207                 pollTxBitRates();
208             }
209         };
210
211         registerWithOSGIConsole();
212     }
213
214     /**
215      * Continuously polls the transmit bit rate for all the node connectors
216      * from statistics manager and trigger the warning notification upward
217      * when the transmit rate is above a threshold which is a percentage of
218      * the edge bandwidth
219      */
220     protected void pollTxBitRates() {
221         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
222                 .get(GlobalConstants.DEFAULT.toString());
223         if (globalContainerEdges == null) {
224             return;
225         }
226
227         for (NodeConnector connector : globalContainerEdges.keySet()) {
228             // Skip if node connector belongs to production switch
229             if (connector.getType() == NodeConnector.NodeConnectorIDType.PRODUCTION) {
230                 continue;
231             }
232
233             // Get edge for which this node connector is head
234             Pair<Edge, Set<Property>> props = this.edgeMap.get(
235                     GlobalConstants.DEFAULT.toString()).get(connector);
236             // On switch mgr restart the props get reset
237             if (props == null) {
238                 continue;
239             }
240             Set<Property> propSet = props.getRight();
241             if (propSet == null) {
242                 continue;
243             }
244
245             float bw = 0;
246             for (Property prop : propSet) {
247                 if (prop instanceof Bandwidth) {
248                     bw = ((Bandwidth) prop).getValue();
249                     break;
250                 }
251             }
252
253             // Skip if agent did not provide a bandwidth info for the edge
254             if (bw == 0) {
255                 continue;
256             }
257
258             // Compare bandwidth usage
259             Long switchId = (Long) connector.getNode().getID();
260             Short port = (Short) connector.getID();
261             float rate = statsMgr.getTransmitRate(switchId, port);
262             if (rate > bwThresholdFactor * bw) {
263                 if (!connectorsOverUtilized.contains(connector)) {
264                     connectorsOverUtilized.add(connector);
265                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
266                             UpdateType.ADDED));
267                 }
268             } else {
269                 if (connectorsOverUtilized.contains(connector)) {
270                     connectorsOverUtilized.remove(connector);
271                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
272                             UpdateType.REMOVED));
273                 }
274             }
275         }
276
277     }
278
279     /**
280      * Function called by the dependency manager when at least one
281      * dependency become unsatisfied or when the component is shutting
282      * down because for example bundle is being stopped.
283      *
284      */
285     void destroy() {
286         logger.trace("DESTROY called!");
287         notifyQ = null;
288         notifyThread = null;
289     }
290
291     /**
292      * Function called by dependency manager after "init ()" is called
293      * and after the services provided by the class are registered in
294      * the service registry
295      *
296      */
297     void start() {
298         logger.trace("START called!");
299         notifyThread.start();
300         bwUtilNotifyThread.start();
301         ofPluginTopoBulkUpdate.start();
302         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
303     }
304
305     /**
306      * Function called by the dependency manager before the services
307      * exported by the component are unregistered, this will be
308      * followed by a "destroy ()" calls
309      *
310      */
311     void stop() {
312         logger.trace("STOP called!");
313         shuttingDown = true;
314         notifyThread.interrupt();
315     }
316
317     void setTopologyServiceShimListener(Map<?, ?> props,
318             ITopologyServiceShimListener s) {
319         if (props == null) {
320             logger.error("Didn't receive the service properties");
321             return;
322         }
323         String containerName = (String) props.get("containerName");
324         if (containerName == null) {
325             logger.error("containerName not supplied");
326             return;
327         }
328         if ((this.topologyServiceShimListeners != null)
329                 && !this.topologyServiceShimListeners.containsKey(s)) {
330             this.topologyServiceShimListeners.put(containerName, s);
331             logger.trace("Added topologyServiceShimListener for container:"
332                     + containerName);
333         }
334     }
335
336     void unsetTopologyServiceShimListener(Map<?, ?> props,
337             ITopologyServiceShimListener s) {
338         if (props == null) {
339             logger.error("Didn't receive the service properties");
340             return;
341         }
342         String containerName = (String) props.get("containerName");
343         if (containerName == null) {
344             logger.error("containerName not supplied");
345             return;
346         }
347         if ((this.topologyServiceShimListeners != null)
348                 && this.topologyServiceShimListeners.containsKey(s)) {
349             this.topologyServiceShimListeners.remove(containerName);
350             logger.trace("Removed topologyServiceShimListener for container: "
351                     + containerName);
352         }
353     }
354
355     void setStatisticsManager(IOFStatisticsManager s) {
356         this.statsMgr = s;
357     }
358
359     void unsetStatisticsManager(IOFStatisticsManager s) {
360         if (this.statsMgr == s) {
361             this.statsMgr = null;
362         }
363     }
364
365     private void removeNodeConnector(String container,
366             NodeConnector nodeConnector) {
367         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
368                 .get(container);
369         if (edgePropsMap == null) {
370             return;
371         }
372
373         // Remove edge in one direction
374         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
375         if (edgeProps == null) {
376             return;
377         }
378         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
379
380         // Remove edge in another direction
381         edgeProps = edgePropsMap
382                 .get(edgeProps.getLeft().getHeadNodeConnector());
383         if (edgeProps == null) {
384             return;
385         }
386         notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
387     }
388
389     private void notifyEdge(String container, Edge edge, UpdateType type,
390             Set<Property> props) {
391         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
392                 .get(container);
393         NodeConnector src = edge.getTailNodeConnector();
394         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
395                 edge, props);
396
397         switch (type) {
398         case ADDED:
399         case CHANGED:
400             if (edgePropsMap == null) {
401                 edgePropsMap = new HashMap<NodeConnector, Pair<Edge, Set<Property>>>();
402             } else {
403                 if (edgePropsMap.containsKey(src)
404                         && edgePropsMap.get(src).equals(edgeProps)) {
405                     // Entry already exists. Return here.
406                     return;
407                 }
408             }
409             edgePropsMap.put(src, edgeProps);
410             edgeMap.put(container, edgePropsMap);
411             break;
412         case REMOVED:
413             if (edgePropsMap != null) {
414                 edgePropsMap.remove(src);
415                 if (edgePropsMap.isEmpty()) {
416                     edgeMap.remove(container);
417                 } else {
418                     edgeMap.put(container, edgePropsMap);
419                 }
420             }
421             break;
422         default:
423             logger.debug("Invalid " + type + " Edge " + edge
424                     + " in container {}", container);
425             return;
426         }
427
428         notifyQ.add(new NotifyEntry(container, edgeProps, type));
429
430         logger.trace(type + " Edge " + edge + " in container {}", container);
431     }
432
433     @Override
434     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
435         if ((edge == null) || (type == null)) {
436             return;
437         }
438
439         // Notify default container
440         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
441
442         // Notify the corresponding containers
443         List<String> containers = getEdgeContainers(edge);
444         if (containers != null) {
445             for (String container : containers) {
446                 notifyEdge(container, edge, type, props);
447             }
448         }
449     }
450
451     /*
452      * Return a list of containers the edge associated with
453      */
454     private List<String> getEdgeContainers(Edge edge) {
455         NodeConnector src = edge.getTailNodeConnector(), dst = edge
456                 .getHeadNodeConnector();
457
458         if (!src.getType().equals(
459                 NodeConnector.NodeConnectorIDType.PRODUCTION)) {
460             /* Find the common containers for both ends */
461             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
462                     .get(dst), cmnContainers = null;
463             if ((srcContainers != null) && (dstContainers != null)) {
464                 cmnContainers = new ArrayList<String>(srcContainers);
465                 cmnContainers.retainAll(dstContainers);
466             }
467             return cmnContainers;
468         } else {
469             /*
470              * If the neighbor is part of a monitored production network, get
471              * the containers that the edge port belongs to
472              */
473             return this.containerMap.get(dst);
474         }
475     }
476
477     @Override
478     public void tagUpdated(String containerName, Node n, short oldTag,
479             short newTag, UpdateType t) {
480     }
481
482     @Override
483     public void containerFlowUpdated(String containerName,
484             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
485     }
486
487     @Override
488     public void nodeConnectorUpdated(String containerName, NodeConnector p,
489             UpdateType t) {
490         if (this.containerMap == null) {
491             logger.error("containerMap is NULL");
492             return;
493         }
494         List<String> containers = this.containerMap.get(p);
495         if (containers == null) {
496             containers = new CopyOnWriteArrayList<String>();
497         }
498         boolean updateMap = false;
499         switch (t) {
500         case ADDED:
501             if (!containers.contains(containerName)) {
502                 containers.add(containerName);
503                 updateMap = true;
504             }
505             break;
506         case REMOVED:
507             if (containers.contains(containerName)) {
508                 containers.remove(containerName);
509                 updateMap = true;
510                 removeNodeConnector(containerName, p);
511             }
512             break;
513         case CHANGED:
514             break;
515         }
516         if (updateMap) {
517             if (containers.isEmpty()) {
518                 // Do cleanup to reduce memory footprint if no
519                 // elements to be tracked
520                 this.containerMap.remove(p);
521             } else {
522                 this.containerMap.put(p, containers);
523             }
524         }
525     }
526
527     @Override
528     public void containerModeUpdated(UpdateType t) {
529         // do nothing
530     }
531
532     private void registerWithOSGIConsole() {
533         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
534                 .getBundleContext();
535         bundleContext.registerService(CommandProvider.class.getName(), this,
536                 null);
537     }
538
539     @Override
540     public String getHelp() {
541         StringBuffer help = new StringBuffer();
542         help.append("---Topology Service Shim---\n");
543         help
544                 .append("\t pem [container]               - Print edgeMap entries for a given container\n");
545         return help.toString();
546     }
547
548     public void _pem(CommandInterpreter ci) {
549         String container = ci.nextArgument();
550         if (container == null) {
551             container = GlobalConstants.DEFAULT.toString();
552         }
553
554         ci.println("Container: " + container);
555         ci
556                 .println("                             Edge                                          Bandwidth");
557
558         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
559                 .get(container);
560         if (edgePropsMap == null) {
561             return;
562         }
563         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
564             if (edgeProps == null) {
565                 continue;
566             }
567
568             long bw = 0;
569             for (Property prop : edgeProps.getRight()) {
570                 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
571                     bw = ((Bandwidth) prop).getValue();
572                 }
573             }
574
575             ci.println(edgeProps.getLeft() + "          " + bw);
576         }
577     }
578
579     public void _bwfactor(CommandInterpreter ci) {
580         String factorString = ci.nextArgument();
581         if (factorString == null) {
582             ci.println("Bw threshold: " + this.bwThresholdFactor);
583             ci.println("Insert a non null bw threshold");
584             return;
585         }
586         bwThresholdFactor = Float.parseFloat(factorString);
587         ci.println("New Bw threshold: " + this.bwThresholdFactor);
588     }
589
590     /**
591      * This method will trigger topology updates to be sent
592      * toward SAL.  SAL then pushes the updates to ALL the applications
593      * that have registered as listeners for this service.  SAL has no
594      * way of knowing which application requested for the refresh.
595      *
596      * As an example of this case, is stopping and starting the
597      * Topology Manager.  When the topology Manager is stopped,
598      * and restarted, it will no longer have the latest topology.
599      * Hence, a request is sent here.
600      *
601      * @param containerName
602      * @return void
603      */
604     @Override
605     public void requestRefresh(String containerName) {
606         // wake up a bulk update thread and exit
607         // the thread will execute the bulkUpdate()
608         bulkNotifyQ.add(containerName);
609     }
610
611     /**
612      * Reading the current topology database, the method will replay
613      * all the edge updates for the ITopologyServiceShimListener instance
614      * in the given container, which will in turn publish them toward SAL.
615      * @param containerName
616      */
617     private void TopologyBulkUpdate(String containerName) {
618         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
619
620         logger.debug("Try bulk update for container:{}", containerName);
621         edgePropMap = edgeMap.get(containerName);
622         if (edgePropMap == null) {
623             logger.debug("No edges known for container:{}", containerName);
624             return;
625         }
626         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
627                 .get(containerName);
628         if (topologServiceShimListener == null) {
629             logger.debug("No topology service shim listener for container:{}",
630                     containerName);
631             return;
632         }
633         int i = 0;
634         for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
635             if (edgeProps != null) {
636                 i++;
637                 logger.trace("Add edge {}", edgeProps.getLeft());
638                 topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
639                         UpdateType.ADDED, edgeProps.getRight());
640             }
641         }
642         logger.debug("Sent {} updates", i);
643     }
644
645 }