Merge "Bug 509: Improve logging in InMemoryDataStore."
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.HashMap;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Set;
18 import java.util.Timer;
19 import java.util.TimerTask;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.LinkedBlockingQueue;
25
26 import org.apache.commons.lang3.tuple.ImmutablePair;
27 import org.apache.commons.lang3.tuple.Pair;
28 import org.eclipse.osgi.framework.console.CommandInterpreter;
29 import org.eclipse.osgi.framework.console.CommandProvider;
30 import org.opendaylight.controller.protocol_plugin.openflow.IDiscoveryListener;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
33 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
34 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
35 import org.opendaylight.controller.sal.core.Bandwidth;
36 import org.opendaylight.controller.sal.core.Config;
37 import org.opendaylight.controller.sal.core.ContainerFlow;
38 import org.opendaylight.controller.sal.core.Edge;
39 import org.opendaylight.controller.sal.core.IContainerAware;
40 import org.opendaylight.controller.sal.core.IContainerListener;
41 import org.opendaylight.controller.sal.core.Node;
42 import org.opendaylight.controller.sal.core.NodeConnector;
43 import org.opendaylight.controller.sal.core.Property;
44 import org.opendaylight.controller.sal.core.State;
45 import org.opendaylight.controller.sal.core.UpdateType;
46 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
47 import org.opendaylight.controller.sal.utils.GlobalConstants;
48 import org.osgi.framework.BundleContext;
49 import org.osgi.framework.FrameworkUtil;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * The class describes a shim layer that relays the topology events from
55  * OpenFlow core to various listeners. The notifications are filtered based on
56  * container configurations.
57  */
58 public class TopologyServiceShim implements IDiscoveryListener,
59         IContainerListener, CommandProvider, IRefreshInternalProvider,
60         IInventoryShimExternalListener, IContainerAware {
61     protected static final Logger logger = LoggerFactory
62             .getLogger(TopologyServiceShim.class);
63     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
64     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
65     private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
66
67     private BlockingQueue<NotifyEntry> notifyQ;
68     private Thread notifyThread;
69     private BlockingQueue<String> bulkNotifyQ;
70     private Thread ofPluginTopoBulkUpdate;
71     private volatile Boolean shuttingDown = false;
72     private IOFStatisticsManager statsMgr;
73     private Timer pollTimer;
74     private TimerTask txRatePoller;
75     private Thread bwUtilNotifyThread;
76     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
77     private List<NodeConnector> connectorsOverUtilized;
78     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
79                                                    // bandwidth
80
81     class NotifyEntry {
82         String container;
83         List<TopoEdgeUpdate> teuList;
84
85         public NotifyEntry(String container, TopoEdgeUpdate teu) {
86             this.container = container;
87             this.teuList = new ArrayList<TopoEdgeUpdate>();
88             if (teu != null) {
89                 this.teuList.add(teu);
90             }
91         }
92
93         public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
94             this.container = container;
95             this.teuList = new ArrayList<TopoEdgeUpdate>();
96             if (teuList != null) {
97                 this.teuList.addAll(teuList);
98             }
99         }
100     }
101
102     class TopologyNotify implements Runnable {
103         private final BlockingQueue<NotifyEntry> notifyQ;
104         private NotifyEntry entry;
105         private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
106         private List<TopoEdgeUpdate> teuList;
107         private boolean notifyListeners;
108
109         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
110             this.notifyQ = notifyQ;
111         }
112
113         @Override
114         public void run() {
115             while (true) {
116                 try {
117                     teuMap.clear();
118                     notifyListeners = false;
119                     while (!notifyQ.isEmpty()) {
120                         entry = notifyQ.take();
121                         teuList = teuMap.get(entry.container);
122                         if (teuList == null) {
123                             teuList = new ArrayList<TopoEdgeUpdate>();
124                         }
125                         // group all the updates together
126                         teuList.addAll(entry.teuList);
127                         teuMap.put(entry.container, teuList);
128                         notifyListeners = true;
129                     }
130
131                     if (notifyListeners) {
132                         for (String container : teuMap.keySet()) {
133                             // notify the listener
134                             ITopologyServiceShimListener l = topologyServiceShimListeners.get(container);
135                             // container topology service may not have come up yet
136                             if (l != null) {
137                                 l.edgeUpdate(teuMap.get(container));
138                             }
139                         }
140                     }
141
142                     Thread.sleep(100);
143                 } catch (InterruptedException e1) {
144                     logger.trace("TopologyNotify interrupted {}",
145                             e1.getMessage());
146                     if (shuttingDown) {
147                         return;
148                     }
149                 } catch (Exception e2) {
150                     logger.error("", e2);
151                 }
152             }
153         }
154     }
155
156     class UtilizationUpdate {
157         NodeConnector connector;
158         UpdateType type;
159
160         UtilizationUpdate(NodeConnector connector, UpdateType type) {
161             this.connector = connector;
162             this.type = type;
163         }
164     }
165
166     class BwUtilizationNotify implements Runnable {
167         private final BlockingQueue<UtilizationUpdate> notifyQ;
168
169         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
170             this.notifyQ = notifyQ;
171         }
172
173         @Override
174         public void run() {
175             while (true) {
176                 try {
177                     UtilizationUpdate update = notifyQ.take();
178                     NodeConnector connector = update.connector;
179                     Set<String> containerList = edgeMap.keySet();
180                     for (String container : containerList) {
181                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
182                                 .get(container);
183                         // the edgePropsMap for a particular container may not have
184                         // the connector.
185                         // so check for null
186                         Pair<Edge, Set<Property>> edgeProp = edgePropsMap.get(connector);
187                         if(edgeProp != null) {
188                             Edge edge = edgeProp.getLeft();
189                             if (edge.getTailNodeConnector().equals(connector)) {
190                                 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
191                                         .get(container);
192                                 if (update.type == UpdateType.ADDED) {
193                                     topologServiceShimListener
194                                     .edgeOverUtilized(edge);
195                                 } else {
196                                     topologServiceShimListener
197                                     .edgeUtilBackToNormal(edge);
198                                 }
199                             }
200                         }
201                     }
202                 } catch (InterruptedException e1) {
203                     logger.trace(
204                             "Edge Bandwidth Utilization Notify Thread interrupted {}",
205                             e1.getMessage());
206                     if (shuttingDown) {
207                         return;
208                     }
209                 } catch (Exception e2) {
210                     logger.error("", e2);
211                 }
212             }
213         }
214     }
215
216     /**
217      * Function called by the dependency manager when all the required
218      * dependencies are satisfied
219      *
220      */
221     void init() {
222         logger.trace("Init called");
223         connectorsOverUtilized = new ArrayList<NodeConnector>();
224         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
225         notifyThread = new Thread(new TopologyNotify(notifyQ));
226         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
227         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
228         bulkNotifyQ = new LinkedBlockingQueue<String>();
229         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
230             @Override
231             public void run() {
232                 while (true) {
233                     try {
234                         String containerName = bulkNotifyQ.take();
235                         logger.debug("Bulk Notify container:{}", containerName);
236                         TopologyBulkUpdate(containerName);
237                     } catch (InterruptedException e) {
238                         logger.trace("Topology Bulk update thread interrupted");
239                         if (shuttingDown) {
240                             return;                        }
241                     }
242                 }
243             }
244         }, "Topology Bulk Update");
245
246         // Initialize node connector tx bit rate poller timer
247         pollTimer = new Timer();
248         txRatePoller = new TimerTask() {
249             @Override
250             public void run() {
251                 pollTxBitRates();
252             }
253         };
254
255         registerWithOSGIConsole();
256     }
257
258     /**
259      * Continuously polls the transmit bit rate for all the node connectors from
260      * statistics manager and trigger the warning notification upward when the
261      * transmit rate is above a threshold which is a percentage of the edge
262      * bandwidth
263      */
264     protected void pollTxBitRates() {
265         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
266                 .get(GlobalConstants.DEFAULT.toString());
267         if (shuttingDown) {
268             logger.trace("Getting out the pollTxBitRates because bundle going down");
269             return;
270         }
271         if (globalContainerEdges == null) {
272             return;
273         }
274
275         for (NodeConnector connector : globalContainerEdges.keySet()) {
276             // Skip if node connector belongs to production switch
277             if (connector.getType().equals(
278                     NodeConnector.NodeConnectorIDType.PRODUCTION)) {
279                 continue;
280             }
281
282             // Get edge for which this node connector is head
283             Pair<Edge, Set<Property>> props = this.edgeMap.get(
284                     GlobalConstants.DEFAULT.toString()).get(connector);
285             // On switch mgr restart the props get reset
286             if (props == null) {
287                 continue;
288             }
289             Set<Property> propSet = props.getRight();
290             if (propSet == null) {
291                 continue;
292             }
293
294             float bw = 0;
295             for (Property prop : propSet) {
296                 if (prop instanceof Bandwidth) {
297                     bw = ((Bandwidth) prop).getValue();
298                     break;
299                 }
300             }
301
302             // Skip if agent did not provide a bandwidth info for the edge
303             if (bw == 0) {
304                 continue;
305             }
306
307             // Compare bandwidth usage
308             Long switchId = (Long) connector.getNode().getID();
309             Short port = (Short) connector.getID();
310             if (statsMgr != null) {
311                 float rate = statsMgr.getTransmitRate(switchId, port);
312                 if (rate > bwThresholdFactor * bw) {
313                     if (!connectorsOverUtilized.contains(connector)) {
314                         connectorsOverUtilized.add(connector);
315                         this.bwUtilNotifyQ.add(new UtilizationUpdate(connector, UpdateType.ADDED));
316                     }
317                 } else {
318                     if (connectorsOverUtilized.contains(connector)) {
319                         connectorsOverUtilized.remove(connector);
320                         this.bwUtilNotifyQ.add(new UtilizationUpdate(connector, UpdateType.REMOVED));
321                     }
322                 }
323             }
324         }
325
326     }
327
328     /**
329      * Function called by the dependency manager when at least one dependency
330      * become unsatisfied or when the component is shutting down because for
331      * example bundle is being stopped.
332      *
333      */
334     void destroy() {
335         logger.trace("DESTROY called!");
336         notifyQ = null;
337         notifyThread = null;
338     }
339
340     /**
341      * Function called by dependency manager after "init ()" is called and after
342      * the services provided by the class are registered in the service registry
343      *
344      */
345     void start() {
346         logger.trace("START called!");
347         notifyThread.start();
348         bwUtilNotifyThread.start();
349         ofPluginTopoBulkUpdate.start();
350         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
351     }
352
353     /**
354      * Function called by the dependency manager before the services exported by
355      * the component are unregistered, this will be followed by a "destroy ()"
356      * calls
357      *
358      */
359     void stop() {
360         logger.trace("STOP called!");
361         shuttingDown = true;
362         notifyThread.interrupt();
363         bwUtilNotifyThread.interrupt();
364         ofPluginTopoBulkUpdate.interrupt();
365         pollTimer.cancel();
366     }
367
368     void setTopologyServiceShimListener(Map<?, ?> props,
369             ITopologyServiceShimListener s) {
370         if (props == null) {
371             logger.error("Didn't receive the service properties");
372             return;
373         }
374         String containerName = (String) props.get("containerName");
375         if (containerName == null) {
376             logger.error("containerName not supplied");
377             return;
378         }
379         if ((this.topologyServiceShimListeners != null)
380                 && !this.topologyServiceShimListeners
381                         .containsKey(containerName)) {
382             this.topologyServiceShimListeners.put(containerName, s);
383             logger.trace("Added topologyServiceShimListener for container: {}",
384                     containerName);
385         }
386     }
387
388     void unsetTopologyServiceShimListener(Map<?, ?> props,
389             ITopologyServiceShimListener s) {
390         if (props == null) {
391             logger.error("Didn't receive the service properties");
392             return;
393         }
394         String containerName = (String) props.get("containerName");
395         if (containerName == null) {
396             logger.error("containerName not supplied");
397             return;
398         }
399         if ((this.topologyServiceShimListeners != null)
400                 && this.topologyServiceShimListeners.containsKey(containerName)
401                 && this.topologyServiceShimListeners.get(containerName).equals(
402                         s)) {
403             this.topologyServiceShimListeners.remove(containerName);
404             logger.trace(
405                     "Removed topologyServiceShimListener for container: {}",
406                     containerName);
407         }
408     }
409
410     void setStatisticsManager(IOFStatisticsManager s) {
411         this.statsMgr = s;
412     }
413
414     void unsetStatisticsManager(IOFStatisticsManager s) {
415         if (this.statsMgr == s) {
416             this.statsMgr = null;
417         }
418     }
419
420     private void updateContainerMap(List<String> containers, NodeConnector p) {
421         if (containers.isEmpty()) {
422             // Do cleanup to reduce memory footprint if no
423             // elements to be tracked
424             this.containerMap.remove(p);
425         } else {
426             this.containerMap.put(p, containers);
427         }
428     }
429
430     /**
431      * From a given edge map, retrieve the edge sourced by the port and update
432      * the local cache in the container
433      *
434      * @param container
435      *            the container name
436      * @param nodeConnector
437      *            the node connector
438      * @param edges
439      *            the given edge map
440      * @return the found edge
441      */
442     private Edge addEdge(String container, NodeConnector nodeConnector,
443             Map<NodeConnector, Pair<Edge, Set<Property>>> edges) {
444         logger.debug("Search edge sourced by port {} in container {}", nodeConnector, container);
445
446         // Retrieve the associated edge
447         Pair<Edge, Set<Property>> edgeProps = edges.get(nodeConnector);
448         if (edgeProps == null) {
449             logger.debug("edgePros is null for port {} in container {}", nodeConnector, container);
450             return null;
451         }
452
453         Edge edge = edgeProps.getLeft();
454         if (edge == null) {
455             logger.debug("edge is null for port {} in container {}", nodeConnector, container);
456             return null;
457         }
458
459         // Make sure the peer port is in the same container
460         NodeConnector peerConnector = edge.getHeadNodeConnector();
461         List<String> containers = this.containerMap.get(peerConnector);
462         if ((containers == null) || !containers.contains(container)) {
463             logger.debug("peer port {} of edge {} is not part of the container {}", new Object[] { peerConnector, edge,
464                     container });
465             return null;
466         }
467
468         // Update the local cache
469         updateLocalEdgeMap(container, edge, UpdateType.ADDED, edgeProps.getRight());
470         logger.debug("Added edge {} to local cache in container {}", edge, container);
471
472         return edge;
473     }
474
475     private void addNodeConnector(String container,
476             NodeConnector nodeConnector) {
477         // Use the global edge map for the newly added port in a container
478         Map<NodeConnector, Pair<Edge, Set<Property>>> globalEdgeMap = edgeMap.get(GlobalConstants.DEFAULT
479                 .toString());
480         if (globalEdgeMap == null) {
481             return;
482         }
483
484         // Get the edge and update local cache in the container
485         Edge edge1, edge2;
486         edge1 = addEdge(container, nodeConnector, globalEdgeMap);
487         if (edge1 == null) {
488             return;
489         }
490
491         // Get the edge in reverse direction and update local cache in the container
492         NodeConnector peerConnector = edge1.getHeadNodeConnector();
493         edge2 = addEdge(container, peerConnector, globalEdgeMap);
494
495         // Send notification upwards in one shot
496         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
497         teuList.add(new TopoEdgeUpdate(edge1, null, UpdateType.ADDED));
498         logger.debug("Notify edge1: {} in container {}", edge1, container);
499         if (edge2 != null) {
500             teuList.add(new TopoEdgeUpdate(edge2, null, UpdateType.ADDED));
501             logger.debug("Notify edge2: {} in container {}", edge2, container);
502         }
503         notifyEdge(container, teuList);
504     }
505
506     private void removeNodeConnector(String container,
507             NodeConnector nodeConnector) {
508         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
509         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
510                 .get(container);
511         if (edgePropsMap == null) {
512             return;
513         }
514
515         // Remove edge in one direction
516         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
517         if (edgeProps == null) {
518             return;
519         }
520         teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
521                 UpdateType.REMOVED));
522
523         // Remove edge in another direction
524         edgeProps = edgePropsMap
525                 .get(edgeProps.getLeft().getHeadNodeConnector());
526         if (edgeProps == null) {
527             return;
528         }
529         teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
530                 UpdateType.REMOVED));
531
532         // Update in one shot
533         notifyEdge(container, teuList);
534     }
535
536     /**
537      * Update local cache and return true if it needs to notify upper layer
538      * Topology listeners.
539      *
540      * @param container
541      *            The network container
542      * @param edge
543      *            The edge
544      * @param type
545      *            The update type
546      * @param props
547      *            The edge properties
548      * @return true if it needs to notify upper layer Topology listeners
549      */
550     private boolean updateLocalEdgeMap(String container, Edge edge,
551             UpdateType type, Set<Property> props) {
552         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
553                 .get(container);
554         NodeConnector src = edge.getTailNodeConnector();
555         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
556                 edge, props);
557         boolean rv = false;
558
559         switch (type) {
560         case ADDED:
561         case CHANGED:
562             if (edgePropsMap == null) {
563                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
564                 rv = true;
565             } else {
566                 if (edgePropsMap.containsKey(src)
567                         && edgePropsMap.get(src).equals(edgeProps)) {
568                     // Entry already exists. No update.
569                     rv = false;
570                 } else {
571                     rv = true;
572                 }
573             }
574             if (rv) {
575                 edgePropsMap.put(src, edgeProps);
576                 edgeMap.put(container, edgePropsMap);
577             }
578             break;
579         case REMOVED:
580             if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
581                 edgePropsMap.remove(src);
582                 if (edgePropsMap.isEmpty()) {
583                     edgeMap.remove(container);
584                 } else {
585                     edgeMap.put(container, edgePropsMap);
586                 }
587                 rv = true;
588             }
589             break;
590         default:
591             logger.debug(
592                     "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
593                     new Object[] { type.getName(), edge, container });
594         }
595
596         if (rv) {
597             logger.debug(
598                     "notifyLocalEdgeMap: {} for Edge {} in container {}",
599                     new Object[] { type.getName(), edge, container });
600         }
601
602         return rv;
603     }
604
605     private void notifyEdge(String container, Edge edge, UpdateType type,
606             Set<Property> props) {
607         boolean notifyListeners;
608
609         // Update local cache
610         notifyListeners = updateLocalEdgeMap(container, edge, type, props);
611
612         // Prepare to update TopologyService
613         if (notifyListeners) {
614             notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
615                     type)));
616             logger.debug("notifyEdge: {} Edge {} in container {}",
617                     new Object[] { type.getName(), edge, container });
618         }
619     }
620
621     private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
622         if (etuList == null) {
623             return;
624         }
625
626         Edge edge;
627         UpdateType type;
628         List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
629         boolean notifyListeners = false, rv;
630
631         for (TopoEdgeUpdate etu : etuList) {
632             edge = etu.getEdge();
633             type = etu.getUpdateType();
634
635             // Update local cache
636             rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
637             if (rv) {
638                 if (!notifyListeners) {
639                     notifyListeners = true;
640                 }
641                 etuNotifyList.add(etu);
642                 logger.debug(
643                         "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
644                         new Object[] { type.getName(), edge, container });
645             }
646         }
647
648         // Prepare to update TopologyService
649         if (notifyListeners) {
650             notifyQ.add(new NotifyEntry(container, etuNotifyList));
651             logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
652         }
653     }
654
655     @Override
656     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
657         if ((edge == null) || (type == null)) {
658             return;
659         }
660
661         // Notify default container
662         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
663
664         // Notify the corresponding containers
665         List<String> containers = getEdgeContainers(edge);
666         if (containers != null) {
667             for (String container : containers) {
668                 notifyEdge(container, edge, type, props);
669             }
670         }
671     }
672
673     /*
674      * Return a list of containers the edge associated with
675      */
676     private List<String> getEdgeContainers(Edge edge) {
677         NodeConnector src = edge.getTailNodeConnector(), dst = edge
678                 .getHeadNodeConnector();
679
680         if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
681             /* Find the common containers for both ends */
682             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
683                     .get(dst), cmnContainers = null;
684             if ((srcContainers != null) && (dstContainers != null)) {
685                 cmnContainers = new ArrayList<String>(srcContainers);
686                 cmnContainers.retainAll(dstContainers);
687             }
688             return cmnContainers;
689         } else {
690             /*
691              * If the neighbor is part of a monitored production network, get
692              * the containers that the edge port belongs to
693              */
694             return this.containerMap.get(dst);
695         }
696     }
697
698     @Override
699     public void tagUpdated(String containerName, Node n, short oldTag,
700             short newTag, UpdateType t) {
701     }
702
703     @Override
704     public void containerFlowUpdated(String containerName,
705             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
706     }
707
708     @Override
709     public void nodeConnectorUpdated(String containerName, NodeConnector p,
710             UpdateType t) {
711         if (this.containerMap == null) {
712             logger.error("containerMap is NULL");
713             return;
714         }
715         List<String> containers = this.containerMap.get(p);
716         if (containers == null) {
717             containers = new CopyOnWriteArrayList<String>();
718         }
719         switch (t) {
720         case ADDED:
721             if (!containers.contains(containerName)) {
722                 containers.add(containerName);
723                 updateContainerMap(containers, p);
724                 addNodeConnector(containerName, p);
725             }
726             break;
727         case REMOVED:
728             if (containers.contains(containerName)) {
729                 containers.remove(containerName);
730                 updateContainerMap(containers, p);
731                 removeNodeConnector(containerName, p);
732             }
733             break;
734         case CHANGED:
735             break;
736         }
737     }
738
739     @Override
740     public void containerModeUpdated(UpdateType t) {
741         // do nothing
742     }
743
744     private void registerWithOSGIConsole() {
745         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
746                 .getBundleContext();
747         bundleContext.registerService(CommandProvider.class.getName(), this,
748                 null);
749     }
750
751     @Override
752     public String getHelp() {
753         StringBuffer help = new StringBuffer();
754         help.append("---Topology Service Shim---\n");
755         help.append("\t pem [container]               - Print edgeMap entries");
756         help.append(" for a given container\n");
757         return help.toString();
758     }
759
760     public void _pem(CommandInterpreter ci) {
761         String container = ci.nextArgument();
762         if (container == null) {
763             container = GlobalConstants.DEFAULT.toString();
764         }
765
766         ci.println("Container: " + container);
767         ci.println("                             Edge                                          Bandwidth");
768
769         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
770                 .get(container);
771         if (edgePropsMap == null) {
772             return;
773         }
774         int count = 0;
775         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
776             if (edgeProps == null) {
777                 continue;
778             }
779
780             long bw = 0;
781             Set<Property> props = edgeProps.getRight();
782             if (props != null) {
783                 for (Property prop : props) {
784                     if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
785                         bw = ((Bandwidth) prop).getValue();
786                     }
787                 }
788             }
789             count++;
790             ci.println(edgeProps.getLeft() + "          " + bw);
791         }
792         ci.println("Total number of Edges: " + count);
793     }
794
795     public void _bwfactor(CommandInterpreter ci) {
796         String factorString = ci.nextArgument();
797         if (factorString == null) {
798             ci.println("Bw threshold: " + this.bwThresholdFactor);
799             ci.println("Insert a non null bw threshold");
800             return;
801         }
802         bwThresholdFactor = Float.parseFloat(factorString);
803         ci.println("New Bw threshold: " + this.bwThresholdFactor);
804     }
805
806     /**
807      * This method will trigger topology updates to be sent toward SAL. SAL then
808      * pushes the updates to ALL the applications that have registered as
809      * listeners for this service. SAL has no way of knowing which application
810      * requested for the refresh.
811      *
812      * As an example of this case, is stopping and starting the Topology
813      * Manager. When the topology Manager is stopped, and restarted, it will no
814      * longer have the latest topology. Hence, a request is sent here.
815      *
816      * @param containerName
817      * @return void
818      */
819     @Override
820     public void requestRefresh(String containerName) {
821         // wake up a bulk update thread and exit
822         // the thread will execute the bulkUpdate()
823         bulkNotifyQ.add(containerName);
824     }
825
826     /**
827      * Retrieve the edges for a given container
828      *
829      * @param containerName
830      *            the container name
831      * @return the edges and their properties
832      */
833     private Collection<Pair<Edge, Set<Property>>> getEdgeProps(String containerName) {
834         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
835         edgePropMap = edgeMap.get(containerName);
836         if (edgePropMap == null) {
837             return null;
838         }
839         return edgePropMap.values();
840     }
841
842     /**
843      * Reading the current topology database, the method will replay all the
844      * edge updates for the ITopologyServiceShimListener instance in the given
845      * container, which will in turn publish them toward SAL.
846      *
847      * @param containerName
848      *            the container name
849      */
850     private void TopologyBulkUpdate(String containerName) {
851         Collection<Pair<Edge, Set<Property>>> edgeProps = null;
852
853         logger.debug("Try bulk update for container:{}", containerName);
854         edgeProps = getEdgeProps(containerName);
855         if (edgeProps == null) {
856             logger.debug("No edges known for container:{}", containerName);
857             return;
858         }
859         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
860                 .get(containerName);
861         if (topologServiceShimListener == null) {
862             logger.debug("No topology service shim listener for container:{}",
863                     containerName);
864             return;
865         }
866         int i = 0;
867         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
868         for (Pair<Edge, Set<Property>> edgeProp : edgeProps) {
869             if (edgeProp != null) {
870                 i++;
871                 teuList.add(new TopoEdgeUpdate(edgeProp.getLeft(), edgeProp
872                         .getRight(), UpdateType.ADDED));
873                 logger.trace("Add edge {}", edgeProp.getLeft());
874             }
875         }
876         if (i > 0) {
877             topologServiceShimListener.edgeUpdate(teuList);
878         }
879         logger.debug("Sent {} updates", i);
880     }
881
882     @Override
883     public void updateNode(Node node, UpdateType type, Set<Property> props) {
884     }
885
886     @Override
887     public void updateNodeConnector(NodeConnector nodeConnector,
888             UpdateType type, Set<Property> props) {
889         List<String> containers = new ArrayList<String>();
890         List<String> conList = this.containerMap.get(nodeConnector);
891
892         containers.add(GlobalConstants.DEFAULT.toString());
893         if (conList != null) {
894             containers.addAll(conList);
895         }
896
897         switch (type) {
898         case ADDED:
899             break;
900         case CHANGED:
901             if (props == null) {
902                 break;
903             }
904
905             boolean rmEdge = false;
906             for (Property prop : props) {
907                 if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
908                         || ((prop instanceof State) && (((State) prop)
909                                 .getValue() != State.EDGE_UP))) {
910                     /*
911                      * If port admin down or link down, remove the edges
912                      * associated with the port
913                      */
914                     rmEdge = true;
915                     break;
916                 }
917             }
918
919             if (rmEdge) {
920                 for (String cName : containers) {
921                     removeNodeConnector(cName, nodeConnector);
922                 }
923             }
924             break;
925         case REMOVED:
926             for (String cName : containers) {
927                 removeNodeConnector(cName, nodeConnector);
928             }
929             break;
930         default:
931             break;
932         }
933     }
934
935     @Override
936     public void containerCreate(String containerName) {
937         // do nothing
938     }
939
940     @Override
941     public void containerDestroy(String containerName) {
942         Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
943         for (Map.Entry<NodeConnector, List<String>> entry : containerMap.entrySet()) {
944             List<String> ncContainers = entry.getValue();
945             if (ncContainers.contains(containerName)) {
946                 NodeConnector nodeConnector = entry.getKey();
947                 removeNodeConnectorSet.add(nodeConnector);
948             }
949         }
950         for (NodeConnector nodeConnector : removeNodeConnectorSet) {
951             List<String> ncContainers = containerMap.get(nodeConnector);
952             ncContainers.remove(containerName);
953             if (ncContainers.isEmpty()) {
954                 containerMap.remove(nodeConnector);
955             }
956         }
957         edgeMap.remove(containerName);
958     }
959 }