null check for the node connector in the container's edgePropsMap
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.HashMap;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Set;
18 import java.util.Timer;
19 import java.util.TimerTask;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.LinkedBlockingQueue;
25
26 import org.apache.commons.lang3.tuple.ImmutablePair;
27 import org.apache.commons.lang3.tuple.Pair;
28 import org.eclipse.osgi.framework.console.CommandInterpreter;
29 import org.eclipse.osgi.framework.console.CommandProvider;
30 import org.opendaylight.controller.protocol_plugin.openflow.IDiscoveryListener;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
33 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
34 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
35 import org.opendaylight.controller.sal.core.Bandwidth;
36 import org.opendaylight.controller.sal.core.Config;
37 import org.opendaylight.controller.sal.core.ContainerFlow;
38 import org.opendaylight.controller.sal.core.Edge;
39 import org.opendaylight.controller.sal.core.IContainerAware;
40 import org.opendaylight.controller.sal.core.IContainerListener;
41 import org.opendaylight.controller.sal.core.Node;
42 import org.opendaylight.controller.sal.core.NodeConnector;
43 import org.opendaylight.controller.sal.core.Property;
44 import org.opendaylight.controller.sal.core.State;
45 import org.opendaylight.controller.sal.core.UpdateType;
46 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
47 import org.opendaylight.controller.sal.utils.GlobalConstants;
48 import org.osgi.framework.BundleContext;
49 import org.osgi.framework.FrameworkUtil;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * The class describes a shim layer that relays the topology events from
55  * OpenFlow core to various listeners. The notifications are filtered based on
56  * container configurations.
57  */
58 public class TopologyServiceShim implements IDiscoveryListener,
59         IContainerListener, CommandProvider, IRefreshInternalProvider,
60         IInventoryShimExternalListener, IContainerAware {
61     protected static final Logger logger = LoggerFactory
62             .getLogger(TopologyServiceShim.class);
63     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
64     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
65     private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
66
67     private BlockingQueue<NotifyEntry> notifyQ;
68     private Thread notifyThread;
69     private BlockingQueue<String> bulkNotifyQ;
70     private Thread ofPluginTopoBulkUpdate;
71     private volatile Boolean shuttingDown = false;
72     private IOFStatisticsManager statsMgr;
73     private Timer pollTimer;
74     private TimerTask txRatePoller;
75     private Thread bwUtilNotifyThread;
76     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
77     private List<NodeConnector> connectorsOverUtilized;
78     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
79                                                    // bandwidth
80
81     class NotifyEntry {
82         String container;
83         List<TopoEdgeUpdate> teuList;
84
85         public NotifyEntry(String container, TopoEdgeUpdate teu) {
86             this.container = container;
87             this.teuList = new ArrayList<TopoEdgeUpdate>();
88             if (teu != null) {
89                 this.teuList.add(teu);
90             }
91         }
92
93         public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
94             this.container = container;
95             this.teuList = new ArrayList<TopoEdgeUpdate>();
96             if (teuList != null) {
97                 this.teuList.addAll(teuList);
98             }
99         }
100     }
101
102     class TopologyNotify implements Runnable {
103         private final BlockingQueue<NotifyEntry> notifyQ;
104         private NotifyEntry entry;
105         private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
106         private List<TopoEdgeUpdate> teuList;
107         private boolean notifyListeners;
108
109         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
110             this.notifyQ = notifyQ;
111         }
112
113         @Override
114         public void run() {
115             while (true) {
116                 try {
117                     teuMap.clear();
118                     notifyListeners = false;
119                     while (!notifyQ.isEmpty()) {
120                         entry = notifyQ.take();
121                         teuList = teuMap.get(entry.container);
122                         if (teuList == null) {
123                             teuList = new ArrayList<TopoEdgeUpdate>();
124                         }
125                         // group all the updates together
126                         teuList.addAll(entry.teuList);
127                         teuMap.put(entry.container, teuList);
128                         notifyListeners = true;
129                     }
130
131                     if (notifyListeners) {
132                         for (String container : teuMap.keySet()) {
133                             // notify the listener
134                             ITopologyServiceShimListener l = topologyServiceShimListeners.get(container);
135                             // container topology service may not have come up yet
136                             if (l != null) {
137                                 l.edgeUpdate(teuMap.get(container));
138                             }
139                         }
140                     }
141
142                     Thread.sleep(100);
143                 } catch (InterruptedException e1) {
144                     logger.warn("TopologyNotify interrupted {}",
145                             e1.getMessage());
146                     if (shuttingDown) {
147                         return;
148                     }
149                 } catch (Exception e2) {
150                     logger.error("", e2);
151                 }
152             }
153         }
154     }
155
156     class UtilizationUpdate {
157         NodeConnector connector;
158         UpdateType type;
159
160         UtilizationUpdate(NodeConnector connector, UpdateType type) {
161             this.connector = connector;
162             this.type = type;
163         }
164     }
165
166     class BwUtilizationNotify implements Runnable {
167         private final BlockingQueue<UtilizationUpdate> notifyQ;
168
169         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
170             this.notifyQ = notifyQ;
171         }
172
173         @Override
174         public void run() {
175             while (true) {
176                 try {
177                     UtilizationUpdate update = notifyQ.take();
178                     NodeConnector connector = update.connector;
179                     Set<String> containerList = edgeMap.keySet();
180                     for (String container : containerList) {
181                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
182                                 .get(container);
183                         // the edgePropsMap for a particular container may not have
184                         // the connector.
185                         // so check for null
186                         Pair<Edge, Set<Property>> edgeProp = edgePropsMap.get(connector);
187                         if(edgeProp != null) {
188                             Edge edge = edgeProp.getLeft();
189                             if (edge.getTailNodeConnector().equals(connector)) {
190                                 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
191                                         .get(container);
192                                 if (update.type == UpdateType.ADDED) {
193                                     topologServiceShimListener
194                                     .edgeOverUtilized(edge);
195                                 } else {
196                                     topologServiceShimListener
197                                     .edgeUtilBackToNormal(edge);
198                                 }
199                             }
200                         }
201                     }
202                 } catch (InterruptedException e1) {
203                     logger.warn(
204                             "Edge Bandwidth Utilization Notify Thread interrupted {}",
205                             e1.getMessage());
206                     if (shuttingDown) {
207                         return;
208                     }
209                 } catch (Exception e2) {
210                     logger.error("", e2);
211                 }
212             }
213         }
214     }
215
216     /**
217      * Function called by the dependency manager when all the required
218      * dependencies are satisfied
219      *
220      */
221     void init() {
222         logger.trace("Init called");
223         connectorsOverUtilized = new ArrayList<NodeConnector>();
224         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
225         notifyThread = new Thread(new TopologyNotify(notifyQ));
226         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
227         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
228         bulkNotifyQ = new LinkedBlockingQueue<String>();
229         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
230             @Override
231             public void run() {
232                 while (true) {
233                     try {
234                         String containerName = bulkNotifyQ.take();
235                         logger.debug("Bulk Notify container:{}", containerName);
236                         TopologyBulkUpdate(containerName);
237                     } catch (InterruptedException e) {
238                         logger.warn("Topology Bulk update thread interrupted");
239                         if (shuttingDown) {
240                             return;
241                         }
242                     }
243                 }
244             }
245         }, "Topology Bulk Update");
246
247         // Initialize node connector tx bit rate poller timer
248         pollTimer = new Timer();
249         txRatePoller = new TimerTask() {
250             @Override
251             public void run() {
252                 pollTxBitRates();
253             }
254         };
255
256         registerWithOSGIConsole();
257     }
258
259     /**
260      * Continuously polls the transmit bit rate for all the node connectors from
261      * statistics manager and trigger the warning notification upward when the
262      * transmit rate is above a threshold which is a percentage of the edge
263      * bandwidth
264      */
265     protected void pollTxBitRates() {
266         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
267                 .get(GlobalConstants.DEFAULT.toString());
268         if (globalContainerEdges == null) {
269             return;
270         }
271
272         for (NodeConnector connector : globalContainerEdges.keySet()) {
273             // Skip if node connector belongs to production switch
274             if (connector.getType().equals(
275                     NodeConnector.NodeConnectorIDType.PRODUCTION)) {
276                 continue;
277             }
278
279             // Get edge for which this node connector is head
280             Pair<Edge, Set<Property>> props = this.edgeMap.get(
281                     GlobalConstants.DEFAULT.toString()).get(connector);
282             // On switch mgr restart the props get reset
283             if (props == null) {
284                 continue;
285             }
286             Set<Property> propSet = props.getRight();
287             if (propSet == null) {
288                 continue;
289             }
290
291             float bw = 0;
292             for (Property prop : propSet) {
293                 if (prop instanceof Bandwidth) {
294                     bw = ((Bandwidth) prop).getValue();
295                     break;
296                 }
297             }
298
299             // Skip if agent did not provide a bandwidth info for the edge
300             if (bw == 0) {
301                 continue;
302             }
303
304             // Compare bandwidth usage
305             Long switchId = (Long) connector.getNode().getID();
306             Short port = (Short) connector.getID();
307             if (statsMgr != null) {
308                 float rate = statsMgr.getTransmitRate(switchId, port);
309                 if (rate > bwThresholdFactor * bw) {
310                     if (!connectorsOverUtilized.contains(connector)) {
311                         connectorsOverUtilized.add(connector);
312                         this.bwUtilNotifyQ.add(new UtilizationUpdate(connector, UpdateType.ADDED));
313                     }
314                 } else {
315                     if (connectorsOverUtilized.contains(connector)) {
316                         connectorsOverUtilized.remove(connector);
317                         this.bwUtilNotifyQ.add(new UtilizationUpdate(connector, UpdateType.REMOVED));
318                     }
319                 }
320             }
321         }
322
323     }
324
325     /**
326      * Function called by the dependency manager when at least one dependency
327      * become unsatisfied or when the component is shutting down because for
328      * example bundle is being stopped.
329      *
330      */
331     void destroy() {
332         logger.trace("DESTROY called!");
333         notifyQ = null;
334         notifyThread = null;
335     }
336
337     /**
338      * Function called by dependency manager after "init ()" is called and after
339      * the services provided by the class are registered in the service registry
340      *
341      */
342     void start() {
343         logger.trace("START called!");
344         notifyThread.start();
345         bwUtilNotifyThread.start();
346         ofPluginTopoBulkUpdate.start();
347         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
348     }
349
350     /**
351      * Function called by the dependency manager before the services exported by
352      * the component are unregistered, this will be followed by a "destroy ()"
353      * calls
354      *
355      */
356     void stop() {
357         logger.trace("STOP called!");
358         shuttingDown = true;
359         notifyThread.interrupt();
360     }
361
362     void setTopologyServiceShimListener(Map<?, ?> props,
363             ITopologyServiceShimListener s) {
364         if (props == null) {
365             logger.error("Didn't receive the service properties");
366             return;
367         }
368         String containerName = (String) props.get("containerName");
369         if (containerName == null) {
370             logger.error("containerName not supplied");
371             return;
372         }
373         if ((this.topologyServiceShimListeners != null)
374                 && !this.topologyServiceShimListeners
375                         .containsKey(containerName)) {
376             this.topologyServiceShimListeners.put(containerName, s);
377             logger.trace("Added topologyServiceShimListener for container: {}",
378                     containerName);
379         }
380     }
381
382     void unsetTopologyServiceShimListener(Map<?, ?> props,
383             ITopologyServiceShimListener s) {
384         if (props == null) {
385             logger.error("Didn't receive the service properties");
386             return;
387         }
388         String containerName = (String) props.get("containerName");
389         if (containerName == null) {
390             logger.error("containerName not supplied");
391             return;
392         }
393         if ((this.topologyServiceShimListeners != null)
394                 && this.topologyServiceShimListeners.containsKey(containerName)
395                 && this.topologyServiceShimListeners.get(containerName).equals(
396                         s)) {
397             this.topologyServiceShimListeners.remove(containerName);
398             logger.trace(
399                     "Removed topologyServiceShimListener for container: {}",
400                     containerName);
401         }
402     }
403
404     void setStatisticsManager(IOFStatisticsManager s) {
405         this.statsMgr = s;
406     }
407
408     void unsetStatisticsManager(IOFStatisticsManager s) {
409         if (this.statsMgr == s) {
410             this.statsMgr = null;
411         }
412     }
413
414     private void updateContainerMap(List<String> containers, NodeConnector p) {
415         if (containers.isEmpty()) {
416             // Do cleanup to reduce memory footprint if no
417             // elements to be tracked
418             this.containerMap.remove(p);
419         } else {
420             this.containerMap.put(p, containers);
421         }
422     }
423
424     /**
425      * From a given edge map, retrieve the edge sourced by the port and update
426      * the local cache in the container
427      *
428      * @param container
429      *            the container name
430      * @param nodeConnector
431      *            the node connector
432      * @param edges
433      *            the given edge map
434      * @return the found edge
435      */
436     private Edge addEdge(String container, NodeConnector nodeConnector,
437             Map<NodeConnector, Pair<Edge, Set<Property>>> edges) {
438         logger.debug("Search edge sourced by port {} in container {}", nodeConnector, container);
439
440         // Retrieve the associated edge
441         Pair<Edge, Set<Property>> edgeProps = edges.get(nodeConnector);
442         if (edgeProps == null) {
443             logger.debug("edgePros is null for port {} in container {}", nodeConnector, container);
444             return null;
445         }
446
447         Edge edge = edgeProps.getLeft();
448         if (edge == null) {
449             logger.debug("edge is null for port {} in container {}", nodeConnector, container);
450             return null;
451         }
452
453         // Make sure the peer port is in the same container
454         NodeConnector peerConnector = edge.getHeadNodeConnector();
455         List<String> containers = this.containerMap.get(peerConnector);
456         if ((containers == null) || !containers.contains(container)) {
457             logger.debug("peer port {} of edge {} is not part of the container {}", new Object[] { peerConnector, edge,
458                     container });
459             return null;
460         }
461
462         // Update the local cache
463         updateLocalEdgeMap(container, edge, UpdateType.ADDED, edgeProps.getRight());
464         logger.debug("Added edge {} to local cache in container {}", edge, container);
465
466         return edge;
467     }
468
469     private void addNodeConnector(String container,
470             NodeConnector nodeConnector) {
471         // Use the global edge map for the newly added port in a container
472         Map<NodeConnector, Pair<Edge, Set<Property>>> globalEdgeMap = edgeMap.get(GlobalConstants.DEFAULT
473                 .toString());
474         if (globalEdgeMap == null) {
475             return;
476         }
477
478         // Get the edge and update local cache in the container
479         Edge edge1, edge2;
480         edge1 = addEdge(container, nodeConnector, globalEdgeMap);
481         if (edge1 == null) {
482             return;
483         }
484
485         // Get the edge in reverse direction and update local cache in the container
486         NodeConnector peerConnector = edge1.getHeadNodeConnector();
487         edge2 = addEdge(container, peerConnector, globalEdgeMap);
488
489         // Send notification upwards in one shot
490         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
491         teuList.add(new TopoEdgeUpdate(edge1, null, UpdateType.ADDED));
492         logger.debug("Notify edge1: {} in container {}", edge1, container);
493         if (edge2 != null) {
494             teuList.add(new TopoEdgeUpdate(edge2, null, UpdateType.ADDED));
495             logger.debug("Notify edge2: {} in container {}", edge2, container);
496         }
497         notifyEdge(container, teuList);
498     }
499
500     private void removeNodeConnector(String container,
501             NodeConnector nodeConnector) {
502         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
503         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
504                 .get(container);
505         if (edgePropsMap == null) {
506             return;
507         }
508
509         // Remove edge in one direction
510         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
511         if (edgeProps == null) {
512             return;
513         }
514         teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
515                 UpdateType.REMOVED));
516
517         // Remove edge in another direction
518         edgeProps = edgePropsMap
519                 .get(edgeProps.getLeft().getHeadNodeConnector());
520         if (edgeProps == null) {
521             return;
522         }
523         teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
524                 UpdateType.REMOVED));
525
526         // Update in one shot
527         notifyEdge(container, teuList);
528     }
529
530     /**
531      * Update local cache and return true if it needs to notify upper layer
532      * Topology listeners.
533      *
534      * @param container
535      *            The network container
536      * @param edge
537      *            The edge
538      * @param type
539      *            The update type
540      * @param props
541      *            The edge properties
542      * @return true if it needs to notify upper layer Topology listeners
543      */
544     private boolean updateLocalEdgeMap(String container, Edge edge,
545             UpdateType type, Set<Property> props) {
546         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
547                 .get(container);
548         NodeConnector src = edge.getTailNodeConnector();
549         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
550                 edge, props);
551         boolean rv = false;
552
553         switch (type) {
554         case ADDED:
555         case CHANGED:
556             if (edgePropsMap == null) {
557                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
558                 rv = true;
559             } else {
560                 if (edgePropsMap.containsKey(src)
561                         && edgePropsMap.get(src).equals(edgeProps)) {
562                     // Entry already exists. No update.
563                     rv = false;
564                 } else {
565                     rv = true;
566                 }
567             }
568             if (rv) {
569                 edgePropsMap.put(src, edgeProps);
570                 edgeMap.put(container, edgePropsMap);
571             }
572             break;
573         case REMOVED:
574             if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
575                 edgePropsMap.remove(src);
576                 if (edgePropsMap.isEmpty()) {
577                     edgeMap.remove(container);
578                 } else {
579                     edgeMap.put(container, edgePropsMap);
580                 }
581                 rv = true;
582             }
583             break;
584         default:
585             logger.debug(
586                     "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
587                     new Object[] { type.getName(), edge, container });
588         }
589
590         if (rv) {
591             logger.debug(
592                     "notifyLocalEdgeMap: {} for Edge {} in container {}",
593                     new Object[] { type.getName(), edge, container });
594         }
595
596         return rv;
597     }
598
599     private void notifyEdge(String container, Edge edge, UpdateType type,
600             Set<Property> props) {
601         boolean notifyListeners;
602
603         // Update local cache
604         notifyListeners = updateLocalEdgeMap(container, edge, type, props);
605
606         // Prepare to update TopologyService
607         if (notifyListeners) {
608             notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
609                     type)));
610             logger.debug("notifyEdge: {} Edge {} in container {}",
611                     new Object[] { type.getName(), edge, container });
612         }
613     }
614
615     private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
616         if (etuList == null) {
617             return;
618         }
619
620         Edge edge;
621         UpdateType type;
622         List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
623         boolean notifyListeners = false, rv;
624
625         for (TopoEdgeUpdate etu : etuList) {
626             edge = etu.getEdge();
627             type = etu.getUpdateType();
628
629             // Update local cache
630             rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
631             if (rv) {
632                 if (!notifyListeners) {
633                     notifyListeners = true;
634                 }
635                 etuNotifyList.add(etu);
636                 logger.debug(
637                         "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
638                         new Object[] { type.getName(), edge, container });
639             }
640         }
641
642         // Prepare to update TopologyService
643         if (notifyListeners) {
644             notifyQ.add(new NotifyEntry(container, etuNotifyList));
645             logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
646         }
647     }
648
649     @Override
650     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
651         if ((edge == null) || (type == null)) {
652             return;
653         }
654
655         // Notify default container
656         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
657
658         // Notify the corresponding containers
659         List<String> containers = getEdgeContainers(edge);
660         if (containers != null) {
661             for (String container : containers) {
662                 notifyEdge(container, edge, type, props);
663             }
664         }
665     }
666
667     /*
668      * Return a list of containers the edge associated with
669      */
670     private List<String> getEdgeContainers(Edge edge) {
671         NodeConnector src = edge.getTailNodeConnector(), dst = edge
672                 .getHeadNodeConnector();
673
674         if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
675             /* Find the common containers for both ends */
676             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
677                     .get(dst), cmnContainers = null;
678             if ((srcContainers != null) && (dstContainers != null)) {
679                 cmnContainers = new ArrayList<String>(srcContainers);
680                 cmnContainers.retainAll(dstContainers);
681             }
682             return cmnContainers;
683         } else {
684             /*
685              * If the neighbor is part of a monitored production network, get
686              * the containers that the edge port belongs to
687              */
688             return this.containerMap.get(dst);
689         }
690     }
691
692     @Override
693     public void tagUpdated(String containerName, Node n, short oldTag,
694             short newTag, UpdateType t) {
695     }
696
697     @Override
698     public void containerFlowUpdated(String containerName,
699             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
700     }
701
702     @Override
703     public void nodeConnectorUpdated(String containerName, NodeConnector p,
704             UpdateType t) {
705         if (this.containerMap == null) {
706             logger.error("containerMap is NULL");
707             return;
708         }
709         List<String> containers = this.containerMap.get(p);
710         if (containers == null) {
711             containers = new CopyOnWriteArrayList<String>();
712         }
713         switch (t) {
714         case ADDED:
715             if (!containers.contains(containerName)) {
716                 containers.add(containerName);
717                 updateContainerMap(containers, p);
718                 addNodeConnector(containerName, p);
719             }
720             break;
721         case REMOVED:
722             if (containers.contains(containerName)) {
723                 containers.remove(containerName);
724                 updateContainerMap(containers, p);
725                 removeNodeConnector(containerName, p);
726             }
727             break;
728         case CHANGED:
729             break;
730         }
731     }
732
733     @Override
734     public void containerModeUpdated(UpdateType t) {
735         // do nothing
736     }
737
738     private void registerWithOSGIConsole() {
739         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
740                 .getBundleContext();
741         bundleContext.registerService(CommandProvider.class.getName(), this,
742                 null);
743     }
744
745     @Override
746     public String getHelp() {
747         StringBuffer help = new StringBuffer();
748         help.append("---Topology Service Shim---\n");
749         help.append("\t pem [container]               - Print edgeMap entries");
750         help.append(" for a given container\n");
751         return help.toString();
752     }
753
754     public void _pem(CommandInterpreter ci) {
755         String container = ci.nextArgument();
756         if (container == null) {
757             container = GlobalConstants.DEFAULT.toString();
758         }
759
760         ci.println("Container: " + container);
761         ci.println("                             Edge                                          Bandwidth");
762
763         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
764                 .get(container);
765         if (edgePropsMap == null) {
766             return;
767         }
768         int count = 0;
769         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
770             if (edgeProps == null) {
771                 continue;
772             }
773
774             long bw = 0;
775             Set<Property> props = edgeProps.getRight();
776             if (props != null) {
777                 for (Property prop : props) {
778                     if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
779                         bw = ((Bandwidth) prop).getValue();
780                     }
781                 }
782             }
783             count++;
784             ci.println(edgeProps.getLeft() + "          " + bw);
785         }
786         ci.println("Total number of Edges: " + count);
787     }
788
789     public void _bwfactor(CommandInterpreter ci) {
790         String factorString = ci.nextArgument();
791         if (factorString == null) {
792             ci.println("Bw threshold: " + this.bwThresholdFactor);
793             ci.println("Insert a non null bw threshold");
794             return;
795         }
796         bwThresholdFactor = Float.parseFloat(factorString);
797         ci.println("New Bw threshold: " + this.bwThresholdFactor);
798     }
799
800     /**
801      * This method will trigger topology updates to be sent toward SAL. SAL then
802      * pushes the updates to ALL the applications that have registered as
803      * listeners for this service. SAL has no way of knowing which application
804      * requested for the refresh.
805      *
806      * As an example of this case, is stopping and starting the Topology
807      * Manager. When the topology Manager is stopped, and restarted, it will no
808      * longer have the latest topology. Hence, a request is sent here.
809      *
810      * @param containerName
811      * @return void
812      */
813     @Override
814     public void requestRefresh(String containerName) {
815         // wake up a bulk update thread and exit
816         // the thread will execute the bulkUpdate()
817         bulkNotifyQ.add(containerName);
818     }
819
820     /**
821      * Retrieve the edges for a given container
822      *
823      * @param containerName
824      *            the container name
825      * @return the edges and their properties
826      */
827     private Collection<Pair<Edge, Set<Property>>> getEdgeProps(String containerName) {
828         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
829         edgePropMap = edgeMap.get(containerName);
830         if (edgePropMap == null) {
831             return null;
832         }
833         return edgePropMap.values();
834     }
835
836     /**
837      * Reading the current topology database, the method will replay all the
838      * edge updates for the ITopologyServiceShimListener instance in the given
839      * container, which will in turn publish them toward SAL.
840      *
841      * @param containerName
842      *            the container name
843      */
844     private void TopologyBulkUpdate(String containerName) {
845         Collection<Pair<Edge, Set<Property>>> edgeProps = null;
846
847         logger.debug("Try bulk update for container:{}", containerName);
848         edgeProps = getEdgeProps(containerName);
849         if (edgeProps == null) {
850             logger.debug("No edges known for container:{}", containerName);
851             return;
852         }
853         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
854                 .get(containerName);
855         if (topologServiceShimListener == null) {
856             logger.debug("No topology service shim listener for container:{}",
857                     containerName);
858             return;
859         }
860         int i = 0;
861         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
862         for (Pair<Edge, Set<Property>> edgeProp : edgeProps) {
863             if (edgeProp != null) {
864                 i++;
865                 teuList.add(new TopoEdgeUpdate(edgeProp.getLeft(), edgeProp
866                         .getRight(), UpdateType.ADDED));
867                 logger.trace("Add edge {}", edgeProp.getLeft());
868             }
869         }
870         if (i > 0) {
871             topologServiceShimListener.edgeUpdate(teuList);
872         }
873         logger.debug("Sent {} updates", i);
874     }
875
876     @Override
877     public void updateNode(Node node, UpdateType type, Set<Property> props) {
878     }
879
880     @Override
881     public void updateNodeConnector(NodeConnector nodeConnector,
882             UpdateType type, Set<Property> props) {
883         List<String> containers = new ArrayList<String>();
884         List<String> conList = this.containerMap.get(nodeConnector);
885
886         containers.add(GlobalConstants.DEFAULT.toString());
887         if (conList != null) {
888             containers.addAll(conList);
889         }
890
891         switch (type) {
892         case ADDED:
893             break;
894         case CHANGED:
895             if (props == null) {
896                 break;
897             }
898
899             boolean rmEdge = false;
900             for (Property prop : props) {
901                 if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
902                         || ((prop instanceof State) && (((State) prop)
903                                 .getValue() != State.EDGE_UP))) {
904                     /*
905                      * If port admin down or link down, remove the edges
906                      * associated with the port
907                      */
908                     rmEdge = true;
909                     break;
910                 }
911             }
912
913             if (rmEdge) {
914                 for (String cName : containers) {
915                     removeNodeConnector(cName, nodeConnector);
916                 }
917             }
918             break;
919         case REMOVED:
920             for (String cName : containers) {
921                 removeNodeConnector(cName, nodeConnector);
922             }
923             break;
924         default:
925             break;
926         }
927     }
928
929     @Override
930     public void containerCreate(String containerName) {
931         // do nothing
932     }
933
934     @Override
935     public void containerDestroy(String containerName) {
936         Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
937         for (Map.Entry<NodeConnector, List<String>> entry : containerMap.entrySet()) {
938             List<String> ncContainers = entry.getValue();
939             if (ncContainers.contains(containerName)) {
940                 NodeConnector nodeConnector = entry.getKey();
941                 removeNodeConnectorSet.add(nodeConnector);
942             }
943         }
944         for (NodeConnector nodeConnector : removeNodeConnectorSet) {
945             List<String> ncContainers = containerMap.get(nodeConnector);
946             ncContainers.remove(containerName);
947             if (ncContainers.isEmpty()) {
948                 containerMap.remove(nodeConnector);
949             }
950         }
951         edgeMap.remove(containerName);
952     }
953 }