Fix for cache cleanup in protocol plugin on container deletion
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.HashMap;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Set;
18 import java.util.Timer;
19 import java.util.TimerTask;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.LinkedBlockingQueue;
25
26 import org.apache.commons.lang3.tuple.ImmutablePair;
27 import org.apache.commons.lang3.tuple.Pair;
28 import org.eclipse.osgi.framework.console.CommandInterpreter;
29 import org.eclipse.osgi.framework.console.CommandProvider;
30 import org.opendaylight.controller.protocol_plugin.openflow.IDiscoveryListener;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
33 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
34 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
35 import org.opendaylight.controller.sal.core.Bandwidth;
36 import org.opendaylight.controller.sal.core.Config;
37 import org.opendaylight.controller.sal.core.ContainerFlow;
38 import org.opendaylight.controller.sal.core.Edge;
39 import org.opendaylight.controller.sal.core.IContainerAware;
40 import org.opendaylight.controller.sal.core.IContainerListener;
41 import org.opendaylight.controller.sal.core.Node;
42 import org.opendaylight.controller.sal.core.NodeConnector;
43 import org.opendaylight.controller.sal.core.Property;
44 import org.opendaylight.controller.sal.core.State;
45 import org.opendaylight.controller.sal.core.UpdateType;
46 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
47 import org.opendaylight.controller.sal.utils.GlobalConstants;
48 import org.osgi.framework.BundleContext;
49 import org.osgi.framework.FrameworkUtil;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * The class describes a shim layer that relays the topology events from
55  * OpenFlow core to various listeners. The notifications are filtered based on
56  * container configurations.
57  */
58 public class TopologyServiceShim implements IDiscoveryListener,
59         IContainerListener, CommandProvider, IRefreshInternalProvider,
60         IInventoryShimExternalListener, IContainerAware {
61     protected static final Logger logger = LoggerFactory
62             .getLogger(TopologyServiceShim.class);
63     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
64     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
65     private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
66
67     private BlockingQueue<NotifyEntry> notifyQ;
68     private Thread notifyThread;
69     private BlockingQueue<String> bulkNotifyQ;
70     private Thread ofPluginTopoBulkUpdate;
71     private volatile Boolean shuttingDown = false;
72     private IOFStatisticsManager statsMgr;
73     private Timer pollTimer;
74     private TimerTask txRatePoller;
75     private Thread bwUtilNotifyThread;
76     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
77     private List<NodeConnector> connectorsOverUtilized;
78     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
79                                                    // bandwidth
80
81     class NotifyEntry {
82         String container;
83         List<TopoEdgeUpdate> teuList;
84
85         public NotifyEntry(String container, TopoEdgeUpdate teu) {
86             this.container = container;
87             this.teuList = new ArrayList<TopoEdgeUpdate>();
88             if (teu != null) {
89                 this.teuList.add(teu);
90             }
91         }
92
93         public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
94             this.container = container;
95             this.teuList = new ArrayList<TopoEdgeUpdate>();
96             if (teuList != null) {
97                 this.teuList.addAll(teuList);
98             }
99         }
100     }
101
102     class TopologyNotify implements Runnable {
103         private final BlockingQueue<NotifyEntry> notifyQ;
104         private NotifyEntry entry;
105         private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
106         private List<TopoEdgeUpdate> teuList;
107         private boolean notifyListeners;
108
109         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
110             this.notifyQ = notifyQ;
111         }
112
113         @Override
114         public void run() {
115             while (true) {
116                 try {
117                     teuMap.clear();
118                     notifyListeners = false;
119                     while (!notifyQ.isEmpty()) {
120                         entry = notifyQ.take();
121                         teuList = teuMap.get(entry.container);
122                         if (teuList == null) {
123                             teuList = new ArrayList<TopoEdgeUpdate>();
124                         }
125                         // group all the updates together
126                         teuList.addAll(entry.teuList);
127                         teuMap.put(entry.container, teuList);
128                         notifyListeners = true;
129                     }
130
131                     if (notifyListeners) {
132                         for (String container : teuMap.keySet()) {
133                             // notify the listener
134                             topologyServiceShimListeners.get(container)
135                                     .edgeUpdate(teuMap.get(container));
136                         }
137                     }
138
139                     Thread.sleep(100);
140                 } catch (InterruptedException e1) {
141                     logger.warn("TopologyNotify interrupted {}",
142                             e1.getMessage());
143                     if (shuttingDown) {
144                         return;
145                     }
146                 } catch (Exception e2) {
147                     logger.error("", e2);
148                 }
149             }
150         }
151     }
152
153     class UtilizationUpdate {
154         NodeConnector connector;
155         UpdateType type;
156
157         UtilizationUpdate(NodeConnector connector, UpdateType type) {
158             this.connector = connector;
159             this.type = type;
160         }
161     }
162
163     class BwUtilizationNotify implements Runnable {
164         private final BlockingQueue<UtilizationUpdate> notifyQ;
165
166         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
167             this.notifyQ = notifyQ;
168         }
169
170         @Override
171         public void run() {
172             while (true) {
173                 try {
174                     UtilizationUpdate update = notifyQ.take();
175                     NodeConnector connector = update.connector;
176                     Set<String> containerList = edgeMap.keySet();
177                     for (String container : containerList) {
178                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
179                                 .get(container);
180                         Edge edge = edgePropsMap.get(connector).getLeft();
181                         if (edge.getTailNodeConnector().equals(connector)) {
182                             ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
183                                     .get(container);
184                             if (update.type == UpdateType.ADDED) {
185                                 topologServiceShimListener
186                                         .edgeOverUtilized(edge);
187                             } else {
188                                 topologServiceShimListener
189                                         .edgeUtilBackToNormal(edge);
190                             }
191                         }
192                     }
193                 } catch (InterruptedException e1) {
194                     logger.warn(
195                             "Edge Bandwidth Utilization Notify Thread interrupted {}",
196                             e1.getMessage());
197                     if (shuttingDown) {
198                         return;
199                     }
200                 } catch (Exception e2) {
201                     logger.error("", e2);
202                 }
203             }
204         }
205     }
206
207     /**
208      * Function called by the dependency manager when all the required
209      * dependencies are satisfied
210      *
211      */
212     void init() {
213         logger.trace("Init called");
214         connectorsOverUtilized = new ArrayList<NodeConnector>();
215         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
216         notifyThread = new Thread(new TopologyNotify(notifyQ));
217         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
218         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
219         bulkNotifyQ = new LinkedBlockingQueue<String>();
220         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
221             @Override
222             public void run() {
223                 while (true) {
224                     try {
225                         String containerName = bulkNotifyQ.take();
226                         logger.debug("Bulk Notify container:{}", containerName);
227                         TopologyBulkUpdate(containerName);
228                     } catch (InterruptedException e) {
229                         logger.warn("Topology Bulk update thread interrupted");
230                         if (shuttingDown) {
231                             return;
232                         }
233                     }
234                 }
235             }
236         }, "Topology Bulk Update");
237
238         // Initialize node connector tx bit rate poller timer
239         pollTimer = new Timer();
240         txRatePoller = new TimerTask() {
241             @Override
242             public void run() {
243                 pollTxBitRates();
244             }
245         };
246
247         registerWithOSGIConsole();
248     }
249
250     /**
251      * Continuously polls the transmit bit rate for all the node connectors from
252      * statistics manager and trigger the warning notification upward when the
253      * transmit rate is above a threshold which is a percentage of the edge
254      * bandwidth
255      */
256     protected void pollTxBitRates() {
257         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
258                 .get(GlobalConstants.DEFAULT.toString());
259         if (globalContainerEdges == null) {
260             return;
261         }
262
263         for (NodeConnector connector : globalContainerEdges.keySet()) {
264             // Skip if node connector belongs to production switch
265             if (connector.getType().equals(
266                     NodeConnector.NodeConnectorIDType.PRODUCTION)) {
267                 continue;
268             }
269
270             // Get edge for which this node connector is head
271             Pair<Edge, Set<Property>> props = this.edgeMap.get(
272                     GlobalConstants.DEFAULT.toString()).get(connector);
273             // On switch mgr restart the props get reset
274             if (props == null) {
275                 continue;
276             }
277             Set<Property> propSet = props.getRight();
278             if (propSet == null) {
279                 continue;
280             }
281
282             float bw = 0;
283             for (Property prop : propSet) {
284                 if (prop instanceof Bandwidth) {
285                     bw = ((Bandwidth) prop).getValue();
286                     break;
287                 }
288             }
289
290             // Skip if agent did not provide a bandwidth info for the edge
291             if (bw == 0) {
292                 continue;
293             }
294
295             // Compare bandwidth usage
296             Long switchId = (Long) connector.getNode().getID();
297             Short port = (Short) connector.getID();
298             float rate = statsMgr.getTransmitRate(switchId, port);
299             if (rate > bwThresholdFactor * bw) {
300                 if (!connectorsOverUtilized.contains(connector)) {
301                     connectorsOverUtilized.add(connector);
302                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
303                             UpdateType.ADDED));
304                 }
305             } else {
306                 if (connectorsOverUtilized.contains(connector)) {
307                     connectorsOverUtilized.remove(connector);
308                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
309                             UpdateType.REMOVED));
310                 }
311             }
312         }
313
314     }
315
316     /**
317      * Function called by the dependency manager when at least one dependency
318      * become unsatisfied or when the component is shutting down because for
319      * example bundle is being stopped.
320      *
321      */
322     void destroy() {
323         logger.trace("DESTROY called!");
324         notifyQ = null;
325         notifyThread = null;
326     }
327
328     /**
329      * Function called by dependency manager after "init ()" is called and after
330      * the services provided by the class are registered in the service registry
331      *
332      */
333     void start() {
334         logger.trace("START called!");
335         notifyThread.start();
336         bwUtilNotifyThread.start();
337         ofPluginTopoBulkUpdate.start();
338         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
339     }
340
341     /**
342      * Function called by the dependency manager before the services exported by
343      * the component are unregistered, this will be followed by a "destroy ()"
344      * calls
345      *
346      */
347     void stop() {
348         logger.trace("STOP called!");
349         shuttingDown = true;
350         notifyThread.interrupt();
351     }
352
353     void setTopologyServiceShimListener(Map<?, ?> props,
354             ITopologyServiceShimListener s) {
355         if (props == null) {
356             logger.error("Didn't receive the service properties");
357             return;
358         }
359         String containerName = (String) props.get("containerName");
360         if (containerName == null) {
361             logger.error("containerName not supplied");
362             return;
363         }
364         if ((this.topologyServiceShimListeners != null)
365                 && !this.topologyServiceShimListeners
366                         .containsKey(containerName)) {
367             this.topologyServiceShimListeners.put(containerName, s);
368             logger.trace("Added topologyServiceShimListener for container: {}",
369                     containerName);
370         }
371     }
372
373     void unsetTopologyServiceShimListener(Map<?, ?> props,
374             ITopologyServiceShimListener s) {
375         if (props == null) {
376             logger.error("Didn't receive the service properties");
377             return;
378         }
379         String containerName = (String) props.get("containerName");
380         if (containerName == null) {
381             logger.error("containerName not supplied");
382             return;
383         }
384         if ((this.topologyServiceShimListeners != null)
385                 && this.topologyServiceShimListeners.containsKey(containerName)
386                 && this.topologyServiceShimListeners.get(containerName).equals(
387                         s)) {
388             this.topologyServiceShimListeners.remove(containerName);
389             logger.trace(
390                     "Removed topologyServiceShimListener for container: {}",
391                     containerName);
392         }
393     }
394
395     void setStatisticsManager(IOFStatisticsManager s) {
396         this.statsMgr = s;
397     }
398
399     void unsetStatisticsManager(IOFStatisticsManager s) {
400         if (this.statsMgr == s) {
401             this.statsMgr = null;
402         }
403     }
404
405     private void updateContainerMap(List<String> containers, NodeConnector p) {
406         if (containers.isEmpty()) {
407             // Do cleanup to reduce memory footprint if no
408             // elements to be tracked
409             this.containerMap.remove(p);
410         } else {
411             this.containerMap.put(p, containers);
412         }
413     }
414
415     /**
416      * From a given edge map, retrieve the edge sourced by the port and update
417      * the local cache in the container
418      *
419      * @param container
420      *            the container name
421      * @param nodeConnector
422      *            the node connector
423      * @param edges
424      *            the given edge map
425      * @return the found edge
426      */
427     private Edge addEdge(String container, NodeConnector nodeConnector,
428             Map<NodeConnector, Pair<Edge, Set<Property>>> edges) {
429         logger.debug("Search edge sourced by port {} in container {}", nodeConnector, container);
430
431         // Retrieve the associated edge
432         Pair<Edge, Set<Property>> edgeProps = edges.get(nodeConnector);
433         if (edgeProps == null) {
434             logger.debug("edgePros is null for port {} in container {}", nodeConnector, container);
435             return null;
436         }
437
438         Edge edge = edgeProps.getLeft();
439         if (edge == null) {
440             logger.debug("edge is null for port {} in container {}", nodeConnector, container);
441             return null;
442         }
443
444         // Make sure the peer port is in the same container
445         NodeConnector peerConnector = edge.getHeadNodeConnector();
446         List<String> containers = this.containerMap.get(peerConnector);
447         if ((containers == null) || !containers.contains(container)) {
448             logger.debug("peer port {} of edge {} is not part of the container {}", new Object[] { peerConnector, edge,
449                     container });
450             return null;
451         }
452
453         // Update the local cache
454         updateLocalEdgeMap(container, edge, UpdateType.ADDED, edgeProps.getRight());
455         logger.debug("Added edge {} to local cache in container {}", edge, container);
456
457         return edge;
458     }
459
460     private void addNodeConnector(String container,
461             NodeConnector nodeConnector) {
462         // Use the global edge map for the newly added port in a container
463         Map<NodeConnector, Pair<Edge, Set<Property>>> globalEdgeMap = edgeMap.get(GlobalConstants.DEFAULT
464                 .toString());
465         if (globalEdgeMap == null) {
466             return;
467         }
468
469         // Get the edge and update local cache in the container
470         Edge edge1, edge2;
471         edge1 = addEdge(container, nodeConnector, globalEdgeMap);
472         if (edge1 == null) {
473             return;
474         }
475
476         // Get the edge in reverse direction and update local cache in the container
477         NodeConnector peerConnector = edge1.getHeadNodeConnector();
478         edge2 = addEdge(container, peerConnector, globalEdgeMap);
479
480         // Send notification upwards in one shot
481         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
482         teuList.add(new TopoEdgeUpdate(edge1, null, UpdateType.ADDED));
483         logger.debug("Notify edge1: {} in container {}", edge1, container);
484         if (edge2 != null) {
485             teuList.add(new TopoEdgeUpdate(edge2, null, UpdateType.ADDED));
486             logger.debug("Notify edge2: {} in container {}", edge2, container);
487         }
488         notifyEdge(container, teuList);
489     }
490
491     private void removeNodeConnector(String container,
492             NodeConnector nodeConnector) {
493         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
494         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
495                 .get(container);
496         if (edgePropsMap == null) {
497             return;
498         }
499
500         // Remove edge in one direction
501         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
502         if (edgeProps == null) {
503             return;
504         }
505         teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
506                 UpdateType.REMOVED));
507
508         // Remove edge in another direction
509         edgeProps = edgePropsMap
510                 .get(edgeProps.getLeft().getHeadNodeConnector());
511         if (edgeProps == null) {
512             return;
513         }
514         teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
515                 UpdateType.REMOVED));
516
517         // Update in one shot
518         notifyEdge(container, teuList);
519     }
520
521     /**
522      * Update local cache and return true if it needs to notify upper layer
523      * Topology listeners.
524      *
525      * @param container
526      *            The network container
527      * @param edge
528      *            The edge
529      * @param type
530      *            The update type
531      * @param props
532      *            The edge properties
533      * @return true if it needs to notify upper layer Topology listeners
534      */
535     private boolean updateLocalEdgeMap(String container, Edge edge,
536             UpdateType type, Set<Property> props) {
537         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
538                 .get(container);
539         NodeConnector src = edge.getTailNodeConnector();
540         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
541                 edge, props);
542         boolean rv = false;
543
544         switch (type) {
545         case ADDED:
546         case CHANGED:
547             if (edgePropsMap == null) {
548                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
549                 rv = true;
550             } else {
551                 if (edgePropsMap.containsKey(src)
552                         && edgePropsMap.get(src).equals(edgeProps)) {
553                     // Entry already exists. No update.
554                     rv = false;
555                 } else {
556                     rv = true;
557                 }
558             }
559             if (rv) {
560                 edgePropsMap.put(src, edgeProps);
561                 edgeMap.put(container, edgePropsMap);
562             }
563             break;
564         case REMOVED:
565             if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
566                 edgePropsMap.remove(src);
567                 if (edgePropsMap.isEmpty()) {
568                     edgeMap.remove(container);
569                 } else {
570                     edgeMap.put(container, edgePropsMap);
571                 }
572                 rv = true;
573             }
574             break;
575         default:
576             logger.debug(
577                     "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
578                     new Object[] { type.getName(), edge, container });
579         }
580
581         if (rv) {
582             logger.debug(
583                     "notifyLocalEdgeMap: {} for Edge {} in container {}",
584                     new Object[] { type.getName(), edge, container });
585         }
586
587         return rv;
588     }
589
590     private void notifyEdge(String container, Edge edge, UpdateType type,
591             Set<Property> props) {
592         boolean notifyListeners;
593
594         // Update local cache
595         notifyListeners = updateLocalEdgeMap(container, edge, type, props);
596
597         // Prepare to update TopologyService
598         if (notifyListeners) {
599             notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
600                     type)));
601             logger.debug("notifyEdge: {} Edge {} in container {}",
602                     new Object[] { type.getName(), edge, container });
603         }
604     }
605
606     private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
607         if (etuList == null) {
608             return;
609         }
610
611         Edge edge;
612         UpdateType type;
613         List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
614         boolean notifyListeners = false, rv;
615
616         for (TopoEdgeUpdate etu : etuList) {
617             edge = etu.getEdge();
618             type = etu.getUpdateType();
619
620             // Update local cache
621             rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
622             if (rv) {
623                 if (!notifyListeners) {
624                     notifyListeners = true;
625                 }
626                 etuNotifyList.add(etu);
627                 logger.debug(
628                         "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
629                         new Object[] { type.getName(), edge, container });
630             }
631         }
632
633         // Prepare to update TopologyService
634         if (notifyListeners) {
635             notifyQ.add(new NotifyEntry(container, etuNotifyList));
636             logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
637         }
638     }
639
640     @Override
641     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
642         if ((edge == null) || (type == null)) {
643             return;
644         }
645
646         // Notify default container
647         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
648
649         // Notify the corresponding containers
650         List<String> containers = getEdgeContainers(edge);
651         if (containers != null) {
652             for (String container : containers) {
653                 notifyEdge(container, edge, type, props);
654             }
655         }
656     }
657
658     /*
659      * Return a list of containers the edge associated with
660      */
661     private List<String> getEdgeContainers(Edge edge) {
662         NodeConnector src = edge.getTailNodeConnector(), dst = edge
663                 .getHeadNodeConnector();
664
665         if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
666             /* Find the common containers for both ends */
667             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
668                     .get(dst), cmnContainers = null;
669             if ((srcContainers != null) && (dstContainers != null)) {
670                 cmnContainers = new ArrayList<String>(srcContainers);
671                 cmnContainers.retainAll(dstContainers);
672             }
673             return cmnContainers;
674         } else {
675             /*
676              * If the neighbor is part of a monitored production network, get
677              * the containers that the edge port belongs to
678              */
679             return this.containerMap.get(dst);
680         }
681     }
682
683     @Override
684     public void tagUpdated(String containerName, Node n, short oldTag,
685             short newTag, UpdateType t) {
686     }
687
688     @Override
689     public void containerFlowUpdated(String containerName,
690             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
691     }
692
693     @Override
694     public void nodeConnectorUpdated(String containerName, NodeConnector p,
695             UpdateType t) {
696         if (this.containerMap == null) {
697             logger.error("containerMap is NULL");
698             return;
699         }
700         List<String> containers = this.containerMap.get(p);
701         if (containers == null) {
702             containers = new CopyOnWriteArrayList<String>();
703         }
704         switch (t) {
705         case ADDED:
706             if (!containers.contains(containerName)) {
707                 containers.add(containerName);
708                 updateContainerMap(containers, p);
709                 addNodeConnector(containerName, p);
710             }
711             break;
712         case REMOVED:
713             if (containers.contains(containerName)) {
714                 containers.remove(containerName);
715                 updateContainerMap(containers, p);
716                 removeNodeConnector(containerName, p);
717             }
718             break;
719         case CHANGED:
720             break;
721         }
722     }
723
724     @Override
725     public void containerModeUpdated(UpdateType t) {
726         // do nothing
727     }
728
729     private void registerWithOSGIConsole() {
730         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
731                 .getBundleContext();
732         bundleContext.registerService(CommandProvider.class.getName(), this,
733                 null);
734     }
735
736     @Override
737     public String getHelp() {
738         StringBuffer help = new StringBuffer();
739         help.append("---Topology Service Shim---\n");
740         help.append("\t pem [container]               - Print edgeMap entries");
741         help.append(" for a given container\n");
742         return help.toString();
743     }
744
745     public void _pem(CommandInterpreter ci) {
746         String container = ci.nextArgument();
747         if (container == null) {
748             container = GlobalConstants.DEFAULT.toString();
749         }
750
751         ci.println("Container: " + container);
752         ci.println("                             Edge                                          Bandwidth");
753
754         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
755                 .get(container);
756         if (edgePropsMap == null) {
757             return;
758         }
759         int count = 0;
760         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
761             if (edgeProps == null) {
762                 continue;
763             }
764
765             long bw = 0;
766             Set<Property> props = edgeProps.getRight();
767             if (props != null) {
768                 for (Property prop : props) {
769                     if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
770                         bw = ((Bandwidth) prop).getValue();
771                     }
772                 }
773             }
774             count++;
775             ci.println(edgeProps.getLeft() + "          " + bw);
776         }
777         ci.println("Total number of Edges: " + count);
778     }
779
780     public void _bwfactor(CommandInterpreter ci) {
781         String factorString = ci.nextArgument();
782         if (factorString == null) {
783             ci.println("Bw threshold: " + this.bwThresholdFactor);
784             ci.println("Insert a non null bw threshold");
785             return;
786         }
787         bwThresholdFactor = Float.parseFloat(factorString);
788         ci.println("New Bw threshold: " + this.bwThresholdFactor);
789     }
790
791     /**
792      * This method will trigger topology updates to be sent toward SAL. SAL then
793      * pushes the updates to ALL the applications that have registered as
794      * listeners for this service. SAL has no way of knowing which application
795      * requested for the refresh.
796      *
797      * As an example of this case, is stopping and starting the Topology
798      * Manager. When the topology Manager is stopped, and restarted, it will no
799      * longer have the latest topology. Hence, a request is sent here.
800      *
801      * @param containerName
802      * @return void
803      */
804     @Override
805     public void requestRefresh(String containerName) {
806         // wake up a bulk update thread and exit
807         // the thread will execute the bulkUpdate()
808         bulkNotifyQ.add(containerName);
809     }
810
811     /**
812      * Retrieve the edges for a given container
813      *
814      * @param containerName
815      *            the container name
816      * @return the edges and their properties
817      */
818     private Collection<Pair<Edge, Set<Property>>> getEdgeProps(String containerName) {
819         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
820         edgePropMap = edgeMap.get(containerName);
821         if (edgePropMap == null) {
822             return null;
823         }
824         return edgePropMap.values();
825     }
826
827     /**
828      * Reading the current topology database, the method will replay all the
829      * edge updates for the ITopologyServiceShimListener instance in the given
830      * container, which will in turn publish them toward SAL.
831      *
832      * @param containerName
833      *            the container name
834      */
835     private void TopologyBulkUpdate(String containerName) {
836         Collection<Pair<Edge, Set<Property>>> edgeProps = null;
837
838         logger.debug("Try bulk update for container:{}", containerName);
839         edgeProps = getEdgeProps(containerName);
840         if (edgeProps == null) {
841             logger.debug("No edges known for container:{}", containerName);
842             return;
843         }
844         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
845                 .get(containerName);
846         if (topologServiceShimListener == null) {
847             logger.debug("No topology service shim listener for container:{}",
848                     containerName);
849             return;
850         }
851         int i = 0;
852         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
853         for (Pair<Edge, Set<Property>> edgeProp : edgeProps) {
854             if (edgeProp != null) {
855                 i++;
856                 teuList.add(new TopoEdgeUpdate(edgeProp.getLeft(), edgeProp
857                         .getRight(), UpdateType.ADDED));
858                 logger.trace("Add edge {}", edgeProp.getLeft());
859             }
860         }
861         if (i > 0) {
862             topologServiceShimListener.edgeUpdate(teuList);
863         }
864         logger.debug("Sent {} updates", i);
865     }
866
867     @Override
868     public void updateNode(Node node, UpdateType type, Set<Property> props) {
869     }
870
871     @Override
872     public void updateNodeConnector(NodeConnector nodeConnector,
873             UpdateType type, Set<Property> props) {
874         List<String> containers = new ArrayList<String>();
875         List<String> conList = this.containerMap.get(nodeConnector);
876
877         containers.add(GlobalConstants.DEFAULT.toString());
878         if (conList != null) {
879             containers.addAll(conList);
880         }
881
882         switch (type) {
883         case ADDED:
884             break;
885         case CHANGED:
886             if (props == null) {
887                 break;
888             }
889
890             boolean rmEdge = false;
891             for (Property prop : props) {
892                 if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
893                         || ((prop instanceof State) && (((State) prop)
894                                 .getValue() != State.EDGE_UP))) {
895                     /*
896                      * If port admin down or link down, remove the edges
897                      * associated with the port
898                      */
899                     rmEdge = true;
900                     break;
901                 }
902             }
903
904             if (rmEdge) {
905                 for (String cName : containers) {
906                     removeNodeConnector(cName, nodeConnector);
907                 }
908             }
909             break;
910         case REMOVED:
911             for (String cName : containers) {
912                 removeNodeConnector(cName, nodeConnector);
913             }
914             break;
915         default:
916             break;
917         }
918     }
919
920     @Override
921     public void containerCreate(String containerName) {
922         // do nothing
923     }
924
925     @Override
926     public void containerDestroy(String containerName) {
927         Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
928         for (Map.Entry<NodeConnector, List<String>> entry : containerMap.entrySet()) {
929             List<String> ncContainers = entry.getValue();
930             if (ncContainers.contains(containerName)) {
931                 NodeConnector nodeConnector = entry.getKey();
932                 removeNodeConnectorSet.add(nodeConnector);
933             }
934         }
935         for (NodeConnector nodeConnector : removeNodeConnectorSet) {
936             List<String> ncContainers = containerMap.get(nodeConnector);
937             ncContainers.remove(containerName);
938             if (ncContainers.isEmpty()) {
939                 containerMap.remove(nodeConnector);
940             }
941         }
942         edgeMap.remove(containerName);
943     }
944 }