Merge "Add 'TableStatistics' to SAL and Northbound Statistics API."
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.Timer;
17 import java.util.TimerTask;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.LinkedBlockingQueue;
23
24 import org.apache.commons.lang3.tuple.ImmutablePair;
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.eclipse.osgi.framework.console.CommandInterpreter;
27 import org.eclipse.osgi.framework.console.CommandProvider;
28 import org.opendaylight.controller.protocol_plugin.openflow.IDiscoveryListener;
29 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
30 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
31 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
32 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
33 import org.osgi.framework.BundleContext;
34 import org.osgi.framework.FrameworkUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import org.opendaylight.controller.sal.core.Bandwidth;
39 import org.opendaylight.controller.sal.core.Config;
40 import org.opendaylight.controller.sal.core.ContainerFlow;
41 import org.opendaylight.controller.sal.core.Edge;
42 import org.opendaylight.controller.sal.core.IContainerListener;
43 import org.opendaylight.controller.sal.core.Node;
44 import org.opendaylight.controller.sal.core.NodeConnector;
45 import org.opendaylight.controller.sal.core.Property;
46 import org.opendaylight.controller.sal.core.State;
47 import org.opendaylight.controller.sal.core.UpdateType;
48 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
49 import org.opendaylight.controller.sal.utils.GlobalConstants;
50
51 /**
52  * The class describes a shim layer that relays the topology events from
53  * OpenFlow core to various listeners. The notifications are filtered based on
54  * container configurations.
55  */
56 public class TopologyServiceShim implements IDiscoveryListener,
57         IContainerListener, CommandProvider, IRefreshInternalProvider,
58         IInventoryShimExternalListener {
59     protected static final Logger logger = LoggerFactory
60             .getLogger(TopologyServiceShim.class);
61     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
62     private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
63     private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
64
65     private BlockingQueue<NotifyEntry> notifyQ;
66     private Thread notifyThread;
67     private BlockingQueue<String> bulkNotifyQ;
68     private Thread ofPluginTopoBulkUpdate;
69     private volatile Boolean shuttingDown = false;
70     private IOFStatisticsManager statsMgr;
71     private Timer pollTimer;
72     private TimerTask txRatePoller;
73     private Thread bwUtilNotifyThread;
74     private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
75     private List<NodeConnector> connectorsOverUtilized;
76     private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
77                                                    // bandwidth
78
79     class NotifyEntry {
80         String container;
81         List<TopoEdgeUpdate> teuList;
82
83         public NotifyEntry(String container, TopoEdgeUpdate teu) {
84             this.container = container;
85             this.teuList = new ArrayList<TopoEdgeUpdate>();
86             if (teu != null) {
87                 this.teuList.add(teu);
88             }
89         }
90
91         public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
92             this.container = container;
93             this.teuList = new ArrayList<TopoEdgeUpdate>();
94             if (teuList != null) {
95                 this.teuList.addAll(teuList);
96             }
97         }
98     }
99
100     class TopologyNotify implements Runnable {
101         private final BlockingQueue<NotifyEntry> notifyQ;
102         private NotifyEntry entry;
103         private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
104         private List<TopoEdgeUpdate> teuList;
105         private boolean notifyListeners;
106
107         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
108             this.notifyQ = notifyQ;
109         }
110
111         public void run() {
112             while (true) {
113                 try {
114                     teuMap.clear();
115                     notifyListeners = false;
116                     while (!notifyQ.isEmpty()) {
117                         entry = notifyQ.take();
118                         teuList = teuMap.get(entry.container);
119                         if (teuList == null) {
120                             teuList = new ArrayList<TopoEdgeUpdate>();
121                         }
122                         // group all the updates together
123                         teuList.addAll(entry.teuList);
124                         teuMap.put(entry.container, teuList);
125                         notifyListeners = true;
126                     }
127
128                     if (notifyListeners) {
129                         for (String container : teuMap.keySet()) {
130                             // notify the listener
131                             topologyServiceShimListeners.get(container)
132                                     .edgeUpdate(teuMap.get(container));
133                         }
134                     }
135
136                     Thread.sleep(100);
137                 } catch (InterruptedException e1) {
138                     logger.warn("TopologyNotify interrupted {}",
139                             e1.getMessage());
140                     if (shuttingDown) {
141                         return;
142                     }
143                 } catch (Exception e2) {
144                     logger.error("", e2);
145                 }
146             }
147         }
148     }
149
150     class UtilizationUpdate {
151         NodeConnector connector;
152         UpdateType type;
153
154         UtilizationUpdate(NodeConnector connector, UpdateType type) {
155             this.connector = connector;
156             this.type = type;
157         }
158     }
159
160     class BwUtilizationNotify implements Runnable {
161         private final BlockingQueue<UtilizationUpdate> notifyQ;
162
163         BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
164             this.notifyQ = notifyQ;
165         }
166
167         public void run() {
168             while (true) {
169                 try {
170                     UtilizationUpdate update = notifyQ.take();
171                     NodeConnector connector = update.connector;
172                     Set<String> containerList = edgeMap.keySet();
173                     for (String container : containerList) {
174                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
175                                 .get(container);
176                         Edge edge = edgePropsMap.get(connector).getLeft();
177                         if (edge.getTailNodeConnector().equals(connector)) {
178                             ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
179                                     .get(container);
180                             if (update.type == UpdateType.ADDED) {
181                                 topologServiceShimListener
182                                         .edgeOverUtilized(edge);
183                             } else {
184                                 topologServiceShimListener
185                                         .edgeUtilBackToNormal(edge);
186                             }
187                         }
188                     }
189                 } catch (InterruptedException e1) {
190                     logger.warn(
191                             "Edge Bandwidth Utilization Notify Thread interrupted {}",
192                             e1.getMessage());
193                     if (shuttingDown) {
194                         return;
195                     }
196                 } catch (Exception e2) {
197                     logger.error("", e2);
198                 }
199             }
200         }
201     }
202
203     /**
204      * Function called by the dependency manager when all the required
205      * dependencies are satisfied
206      *
207      */
208     void init() {
209         logger.trace("Init called");
210         connectorsOverUtilized = new ArrayList<NodeConnector>();
211         notifyQ = new LinkedBlockingQueue<NotifyEntry>();
212         notifyThread = new Thread(new TopologyNotify(notifyQ));
213         bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
214         bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
215         bulkNotifyQ = new LinkedBlockingQueue<String>();
216         ofPluginTopoBulkUpdate = new Thread(new Runnable() {
217             @Override
218             public void run() {
219                 while (true) {
220                     try {
221                         String containerName = bulkNotifyQ.take();
222                         logger.debug("Bulk Notify container:{}", containerName);
223                         TopologyBulkUpdate(containerName);
224                     } catch (InterruptedException e) {
225                         logger.warn("Topology Bulk update thread interrupted");
226                         if (shuttingDown) {
227                             return;
228                         }
229                     }
230                 }
231             }
232         }, "Topology Bulk Update");
233
234         // Initialize node connector tx bit rate poller timer
235         pollTimer = new Timer();
236         txRatePoller = new TimerTask() {
237             @Override
238             public void run() {
239                 pollTxBitRates();
240             }
241         };
242
243         registerWithOSGIConsole();
244     }
245
246     /**
247      * Continuously polls the transmit bit rate for all the node connectors from
248      * statistics manager and trigger the warning notification upward when the
249      * transmit rate is above a threshold which is a percentage of the edge
250      * bandwidth
251      */
252     protected void pollTxBitRates() {
253         Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
254                 .get(GlobalConstants.DEFAULT.toString());
255         if (globalContainerEdges == null) {
256             return;
257         }
258
259         for (NodeConnector connector : globalContainerEdges.keySet()) {
260             // Skip if node connector belongs to production switch
261             if (connector.getType().equals(
262                     NodeConnector.NodeConnectorIDType.PRODUCTION)) {
263                 continue;
264             }
265
266             // Get edge for which this node connector is head
267             Pair<Edge, Set<Property>> props = this.edgeMap.get(
268                     GlobalConstants.DEFAULT.toString()).get(connector);
269             // On switch mgr restart the props get reset
270             if (props == null) {
271                 continue;
272             }
273             Set<Property> propSet = props.getRight();
274             if (propSet == null) {
275                 continue;
276             }
277
278             float bw = 0;
279             for (Property prop : propSet) {
280                 if (prop instanceof Bandwidth) {
281                     bw = ((Bandwidth) prop).getValue();
282                     break;
283                 }
284             }
285
286             // Skip if agent did not provide a bandwidth info for the edge
287             if (bw == 0) {
288                 continue;
289             }
290
291             // Compare bandwidth usage
292             Long switchId = (Long) connector.getNode().getID();
293             Short port = (Short) connector.getID();
294             float rate = statsMgr.getTransmitRate(switchId, port);
295             if (rate > bwThresholdFactor * bw) {
296                 if (!connectorsOverUtilized.contains(connector)) {
297                     connectorsOverUtilized.add(connector);
298                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
299                             UpdateType.ADDED));
300                 }
301             } else {
302                 if (connectorsOverUtilized.contains(connector)) {
303                     connectorsOverUtilized.remove(connector);
304                     this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
305                             UpdateType.REMOVED));
306                 }
307             }
308         }
309
310     }
311
312     /**
313      * Function called by the dependency manager when at least one dependency
314      * become unsatisfied or when the component is shutting down because for
315      * example bundle is being stopped.
316      *
317      */
318     void destroy() {
319         logger.trace("DESTROY called!");
320         notifyQ = null;
321         notifyThread = null;
322     }
323
324     /**
325      * Function called by dependency manager after "init ()" is called and after
326      * the services provided by the class are registered in the service registry
327      *
328      */
329     void start() {
330         logger.trace("START called!");
331         notifyThread.start();
332         bwUtilNotifyThread.start();
333         ofPluginTopoBulkUpdate.start();
334         pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
335     }
336
337     /**
338      * Function called by the dependency manager before the services exported by
339      * the component are unregistered, this will be followed by a "destroy ()"
340      * calls
341      *
342      */
343     void stop() {
344         logger.trace("STOP called!");
345         shuttingDown = true;
346         notifyThread.interrupt();
347     }
348
349     void setTopologyServiceShimListener(Map<?, ?> props,
350             ITopologyServiceShimListener s) {
351         if (props == null) {
352             logger.error("Didn't receive the service properties");
353             return;
354         }
355         String containerName = (String) props.get("containerName");
356         if (containerName == null) {
357             logger.error("containerName not supplied");
358             return;
359         }
360         if ((this.topologyServiceShimListeners != null)
361                 && !this.topologyServiceShimListeners
362                         .containsKey(containerName)) {
363             this.topologyServiceShimListeners.put(containerName, s);
364             logger.trace("Added topologyServiceShimListener for container: {}",
365                     containerName);
366         }
367     }
368
369     void unsetTopologyServiceShimListener(Map<?, ?> props,
370             ITopologyServiceShimListener s) {
371         if (props == null) {
372             logger.error("Didn't receive the service properties");
373             return;
374         }
375         String containerName = (String) props.get("containerName");
376         if (containerName == null) {
377             logger.error("containerName not supplied");
378             return;
379         }
380         if ((this.topologyServiceShimListeners != null)
381                 && this.topologyServiceShimListeners.containsKey(containerName)
382                 && this.topologyServiceShimListeners.get(containerName).equals(
383                         s)) {
384             this.topologyServiceShimListeners.remove(containerName);
385             logger.trace(
386                     "Removed topologyServiceShimListener for container: {}",
387                     containerName);
388         }
389     }
390
391     void setStatisticsManager(IOFStatisticsManager s) {
392         this.statsMgr = s;
393     }
394
395     void unsetStatisticsManager(IOFStatisticsManager s) {
396         if (this.statsMgr == s) {
397             this.statsMgr = null;
398         }
399     }
400
401     private void removeNodeConnector(String container,
402             NodeConnector nodeConnector) {
403         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
404         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
405                 .get(container);
406         if (edgePropsMap == null) {
407             return;
408         }
409
410         // Remove edge in one direction
411         Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
412         if (edgeProps == null) {
413             return;
414         }
415         teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
416                 UpdateType.REMOVED));
417
418         // Remove edge in another direction
419         edgeProps = edgePropsMap
420                 .get(edgeProps.getLeft().getHeadNodeConnector());
421         if (edgeProps == null) {
422             return;
423         }
424         teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
425                 UpdateType.REMOVED));
426
427         // Update in one shot
428         notifyEdge(container, teuList);
429     }
430
431     /**
432      * Update local cache and return true if it needs to notify upper layer
433      * Topology listeners.
434      *
435      * @param container
436      *            The network container
437      * @param edge
438      *            The edge
439      * @param type
440      *            The update type
441      * @param props
442      *            The edge properties
443      * @return true if it needs to notify upper layer Topology listeners
444      */
445     private boolean updateLocalEdgeMap(String container, Edge edge,
446             UpdateType type, Set<Property> props) {
447         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
448                 .get(container);
449         NodeConnector src = edge.getTailNodeConnector();
450         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
451                 edge, props);
452         boolean rv = false;
453
454         switch (type) {
455         case ADDED:
456         case CHANGED:
457             if (edgePropsMap == null) {
458                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
459                 rv = true;
460             } else {
461                 if (edgePropsMap.containsKey(src)
462                         && edgePropsMap.get(src).equals(edgeProps)) {
463                     // Entry already exists. No update.
464                     rv = false;
465                 } else {
466                     rv = true;
467                 }
468             }
469             if (rv) {
470                 edgePropsMap.put(src, edgeProps);
471                 edgeMap.put(container, edgePropsMap);
472             }
473             break;
474         case REMOVED:
475             if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
476                 edgePropsMap.remove(src);
477                 if (edgePropsMap.isEmpty()) {
478                     edgeMap.remove(container);
479                 } else {
480                     edgeMap.put(container, edgePropsMap);
481                 }
482                 rv = true;
483             }
484             break;
485         default:
486             logger.debug(
487                     "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
488                     new Object[] { type.getName(), edge, container });
489         }
490
491         if (rv) {
492             logger.debug(
493                     "notifyLocalEdgeMap: {} for Edge {} in container {}",
494                     new Object[] { type.getName(), edge, container });
495         }
496
497         return rv;
498     }
499
500     private void notifyEdge(String container, Edge edge, UpdateType type,
501             Set<Property> props) {
502         boolean notifyListeners;
503
504         // Update local cache
505         notifyListeners = updateLocalEdgeMap(container, edge, type, props);
506
507         // Prepare to update TopologyService
508         if (notifyListeners) {
509             notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
510                     type)));
511             logger.debug("notifyEdge: {} Edge {} in container {}",
512                     new Object[] { type.getName(), edge, container });
513         }
514     }
515
516     private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
517         if (etuList == null) {
518             return;
519         }
520
521         Edge edge;
522         UpdateType type;
523         List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
524         boolean notifyListeners = false, rv;
525
526         for (TopoEdgeUpdate etu : etuList) {
527             edge = etu.getEdge();
528             type = etu.getUpdateType();
529
530             // Update local cache
531             rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
532             if (rv) {
533                 if (!notifyListeners) {
534                     notifyListeners = true;
535                 }
536                 etuNotifyList.add(etu);
537                 logger.debug(
538                         "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
539                         new Object[] { type.getName(), edge, container });
540             }
541         }
542
543         // Prepare to update TopologyService
544         if (notifyListeners) {
545             notifyQ.add(new NotifyEntry(container, etuNotifyList));
546             logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
547         }
548     }
549
550     @Override
551     public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
552         if ((edge == null) || (type == null)) {
553             return;
554         }
555
556         // Notify default container
557         notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
558
559         // Notify the corresponding containers
560         List<String> containers = getEdgeContainers(edge);
561         if (containers != null) {
562             for (String container : containers) {
563                 notifyEdge(container, edge, type, props);
564             }
565         }
566     }
567
568     /*
569      * Return a list of containers the edge associated with
570      */
571     private List<String> getEdgeContainers(Edge edge) {
572         NodeConnector src = edge.getTailNodeConnector(), dst = edge
573                 .getHeadNodeConnector();
574
575         if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
576             /* Find the common containers for both ends */
577             List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
578                     .get(dst), cmnContainers = null;
579             if ((srcContainers != null) && (dstContainers != null)) {
580                 cmnContainers = new ArrayList<String>(srcContainers);
581                 cmnContainers.retainAll(dstContainers);
582             }
583             return cmnContainers;
584         } else {
585             /*
586              * If the neighbor is part of a monitored production network, get
587              * the containers that the edge port belongs to
588              */
589             return this.containerMap.get(dst);
590         }
591     }
592
593     @Override
594     public void tagUpdated(String containerName, Node n, short oldTag,
595             short newTag, UpdateType t) {
596     }
597
598     @Override
599     public void containerFlowUpdated(String containerName,
600             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
601     }
602
603     @Override
604     public void nodeConnectorUpdated(String containerName, NodeConnector p,
605             UpdateType t) {
606         if (this.containerMap == null) {
607             logger.error("containerMap is NULL");
608             return;
609         }
610         List<String> containers = this.containerMap.get(p);
611         if (containers == null) {
612             containers = new CopyOnWriteArrayList<String>();
613         }
614         boolean updateMap = false;
615         switch (t) {
616         case ADDED:
617             if (!containers.contains(containerName)) {
618                 containers.add(containerName);
619                 updateMap = true;
620             }
621             break;
622         case REMOVED:
623             if (containers.contains(containerName)) {
624                 containers.remove(containerName);
625                 updateMap = true;
626                 removeNodeConnector(containerName, p);
627             }
628             break;
629         case CHANGED:
630             break;
631         }
632         if (updateMap) {
633             if (containers.isEmpty()) {
634                 // Do cleanup to reduce memory footprint if no
635                 // elements to be tracked
636                 this.containerMap.remove(p);
637             } else {
638                 this.containerMap.put(p, containers);
639             }
640         }
641     }
642
643     @Override
644     public void containerModeUpdated(UpdateType t) {
645         // do nothing
646     }
647
648     private void registerWithOSGIConsole() {
649         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
650                 .getBundleContext();
651         bundleContext.registerService(CommandProvider.class.getName(), this,
652                 null);
653     }
654
655     @Override
656     public String getHelp() {
657         StringBuffer help = new StringBuffer();
658         help.append("---Topology Service Shim---\n");
659         help.append("\t pem [container]               - Print edgeMap entries");
660         help.append(" for a given container\n");
661         return help.toString();
662     }
663
664     public void _pem(CommandInterpreter ci) {
665         String container = ci.nextArgument();
666         if (container == null) {
667             container = GlobalConstants.DEFAULT.toString();
668         }
669
670         ci.println("Container: " + container);
671         ci.println("                             Edge                                          Bandwidth");
672
673         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
674                 .get(container);
675         if (edgePropsMap == null) {
676             return;
677         }
678         int count = 0;
679         for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
680             if (edgeProps == null) {
681                 continue;
682             }
683
684             long bw = 0;
685             Set<Property> props = edgeProps.getRight();
686             if (props != null) {
687                 for (Property prop : props) {
688                     if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
689                         bw = ((Bandwidth) prop).getValue();
690                     }
691                 }
692             }
693             count++;
694             ci.println(edgeProps.getLeft() + "          " + bw);
695         }
696         ci.println("Total number of Edges: " + count);
697     }
698
699     public void _bwfactor(CommandInterpreter ci) {
700         String factorString = ci.nextArgument();
701         if (factorString == null) {
702             ci.println("Bw threshold: " + this.bwThresholdFactor);
703             ci.println("Insert a non null bw threshold");
704             return;
705         }
706         bwThresholdFactor = Float.parseFloat(factorString);
707         ci.println("New Bw threshold: " + this.bwThresholdFactor);
708     }
709
710     /**
711      * This method will trigger topology updates to be sent toward SAL. SAL then
712      * pushes the updates to ALL the applications that have registered as
713      * listeners for this service. SAL has no way of knowing which application
714      * requested for the refresh.
715      *
716      * As an example of this case, is stopping and starting the Topology
717      * Manager. When the topology Manager is stopped, and restarted, it will no
718      * longer have the latest topology. Hence, a request is sent here.
719      *
720      * @param containerName
721      * @return void
722      */
723     @Override
724     public void requestRefresh(String containerName) {
725         // wake up a bulk update thread and exit
726         // the thread will execute the bulkUpdate()
727         bulkNotifyQ.add(containerName);
728     }
729
730     /**
731      * Reading the current topology database, the method will replay all the
732      * edge updates for the ITopologyServiceShimListener instance in the given
733      * container, which will in turn publish them toward SAL.
734      *
735      * @param containerName
736      */
737     private void TopologyBulkUpdate(String containerName) {
738         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
739
740         logger.debug("Try bulk update for container:{}", containerName);
741         edgePropMap = edgeMap.get(containerName);
742         if (edgePropMap == null) {
743             logger.debug("No edges known for container:{}", containerName);
744             return;
745         }
746         ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
747                 .get(containerName);
748         if (topologServiceShimListener == null) {
749             logger.debug("No topology service shim listener for container:{}",
750                     containerName);
751             return;
752         }
753         int i = 0;
754         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
755         for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
756             if (edgeProps != null) {
757                 i++;
758                 teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps
759                         .getRight(), UpdateType.ADDED));
760                 logger.trace("Add edge {}", edgeProps.getLeft());
761             }
762         }
763         if (i > 0) {
764             topologServiceShimListener.edgeUpdate(teuList);
765         }
766         logger.debug("Sent {} updates", i);
767     }
768
769     @Override
770     public void updateNode(Node node, UpdateType type, Set<Property> props) {
771     }
772
773     @Override
774     public void updateNodeConnector(NodeConnector nodeConnector,
775             UpdateType type, Set<Property> props) {
776         List<String> containers = new ArrayList<String>();
777         List<String> conList = this.containerMap.get(nodeConnector);
778
779         containers.add(GlobalConstants.DEFAULT.toString());
780         if (conList != null) {
781             containers.addAll(conList);
782         }
783
784         switch (type) {
785         case ADDED:
786             break;
787         case CHANGED:
788             if (props == null) {
789                 break;
790             }
791
792             boolean rmEdge = false;
793             for (Property prop : props) {
794                 if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
795                         || ((prop instanceof State) && (((State) prop)
796                                 .getValue() != State.EDGE_UP))) {
797                     /*
798                      * If port admin down or link down, remove the edges
799                      * associated with the port
800                      */
801                     rmEdge = true;
802                     break;
803                 }
804             }
805
806             if (rmEdge) {
807                 for (String cName : containers) {
808                     removeNodeConnector(cName, nodeConnector);
809                 }
810             }
811             break;
812         case REMOVED:
813             for (String cName : containers) {
814                 removeNodeConnector(cName, nodeConnector);
815             }
816             break;
817         default:
818             break;
819         }
820     }
821 }