2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
16 import java.util.Timer;
17 import java.util.TimerTask;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.LinkedBlockingQueue;
24 import org.apache.commons.lang3.tuple.ImmutablePair;
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.eclipse.osgi.framework.console.CommandInterpreter;
27 import org.eclipse.osgi.framework.console.CommandProvider;
28 import org.osgi.framework.BundleContext;
29 import org.osgi.framework.FrameworkUtil;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
34 import org.opendaylight.controller.sal.core.Bandwidth;
35 import org.opendaylight.controller.sal.core.Config;
36 import org.opendaylight.controller.sal.core.ContainerFlow;
37 import org.opendaylight.controller.sal.core.Edge;
38 import org.opendaylight.controller.sal.core.IContainerListener;
39 import org.opendaylight.controller.sal.core.Node;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.State;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
45 import org.opendaylight.controller.sal.utils.GlobalConstants;
46 import org.opendaylight.openflowplugin.openflow.IDiscoveryListener;
47 import org.opendaylight.openflowplugin.openflow.IInventoryShimExternalListener;
48 import org.opendaylight.openflowplugin.openflow.IOFStatisticsManager;
49 import org.opendaylight.openflowplugin.openflow.IRefreshInternalProvider;
50 import org.opendaylight.openflowplugin.openflow.ITopologyServiceShimListener;
53 * The class describes a shim layer that relays the topology events from
54 * OpenFlow core to various listeners. The notifications are filtered based on
55 * container configurations.
57 public class TopologyServiceShim implements IDiscoveryListener,
58 IContainerListener, CommandProvider, IRefreshInternalProvider,
59 IInventoryShimExternalListener {
60 protected static final Logger logger = LoggerFactory
61 .getLogger(TopologyServiceShim.class);
62 private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
63 private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
64 private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
66 private BlockingQueue<NotifyEntry> notifyQ;
67 private Thread notifyThread;
68 private BlockingQueue<String> bulkNotifyQ;
69 private Thread ofPluginTopoBulkUpdate;
70 private volatile Boolean shuttingDown = false;
71 private IOFStatisticsManager statsMgr;
72 private Timer pollTimer;
73 private TimerTask txRatePoller;
74 private Thread bwUtilNotifyThread;
75 private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
76 private List<NodeConnector> connectorsOverUtilized;
77 private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
82 List<TopoEdgeUpdate> teuList;
84 public NotifyEntry(String container, TopoEdgeUpdate teu) {
85 this.container = container;
86 this.teuList = new ArrayList<TopoEdgeUpdate>();
88 this.teuList.add(teu);
92 public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
93 this.container = container;
94 this.teuList = new ArrayList<TopoEdgeUpdate>();
95 if (teuList != null) {
96 this.teuList.addAll(teuList);
101 class TopologyNotify implements Runnable {
102 private final BlockingQueue<NotifyEntry> notifyQ;
103 private NotifyEntry entry;
104 private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
105 private List<TopoEdgeUpdate> teuList;
106 private boolean notifyListeners;
108 TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
109 this.notifyQ = notifyQ;
116 notifyListeners = false;
117 while (!notifyQ.isEmpty()) {
118 entry = notifyQ.take();
119 teuList = teuMap.get(entry.container);
120 if (teuList == null) {
121 teuList = new ArrayList<TopoEdgeUpdate>();
123 // group all the updates together
124 teuList.addAll(entry.teuList);
125 teuMap.put(entry.container, teuList);
126 notifyListeners = true;
129 if (notifyListeners) {
130 for (String container : teuMap.keySet()) {
131 // notify the listener
132 topologyServiceShimListeners.get(container)
133 .edgeUpdate(teuMap.get(container));
138 } catch (InterruptedException e1) {
139 logger.warn("TopologyNotify interrupted {}",
144 } catch (Exception e2) {
145 logger.error("", e2);
151 class UtilizationUpdate {
152 NodeConnector connector;
155 UtilizationUpdate(NodeConnector connector, UpdateType type) {
156 this.connector = connector;
161 class BwUtilizationNotify implements Runnable {
162 private final BlockingQueue<UtilizationUpdate> notifyQ;
164 BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
165 this.notifyQ = notifyQ;
171 UtilizationUpdate update = notifyQ.take();
172 NodeConnector connector = update.connector;
173 Set<String> containerList = edgeMap.keySet();
174 for (String container : containerList) {
175 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
177 Edge edge = edgePropsMap.get(connector).getLeft();
178 if (edge.getTailNodeConnector().equals(connector)) {
179 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
181 if (update.type == UpdateType.ADDED) {
182 topologServiceShimListener
183 .edgeOverUtilized(edge);
185 topologServiceShimListener
186 .edgeUtilBackToNormal(edge);
190 } catch (InterruptedException e1) {
192 "Edge Bandwidth Utilization Notify Thread interrupted {}",
197 } catch (Exception e2) {
198 logger.error("", e2);
205 * Function called by the dependency manager when all the required
206 * dependencies are satisfied
210 logger.trace("Init called");
211 connectorsOverUtilized = new ArrayList<NodeConnector>();
212 notifyQ = new LinkedBlockingQueue<NotifyEntry>();
213 notifyThread = new Thread(new TopologyNotify(notifyQ));
214 bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
215 bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
216 bulkNotifyQ = new LinkedBlockingQueue<String>();
217 ofPluginTopoBulkUpdate = new Thread(new Runnable() {
222 String containerName = bulkNotifyQ.take();
223 logger.debug("Bulk Notify container:{}", containerName);
224 TopologyBulkUpdate(containerName);
225 } catch (InterruptedException e) {
226 logger.warn("Topology Bulk update thread interrupted");
233 }, "Topology Bulk Update");
235 // Initialize node connector tx bit rate poller timer
236 pollTimer = new Timer();
237 txRatePoller = new TimerTask() {
244 registerWithOSGIConsole();
248 * Continuously polls the transmit bit rate for all the node connectors from
249 * statistics manager and trigger the warning notification upward when the
250 * transmit rate is above a threshold which is a percentage of the edge
253 protected void pollTxBitRates() {
254 Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
255 .get(GlobalConstants.DEFAULT.toString());
256 if (globalContainerEdges == null) {
260 for (NodeConnector connector : globalContainerEdges.keySet()) {
261 // Skip if node connector belongs to production switch
262 if (connector.getType().equals(
263 NodeConnector.NodeConnectorIDType.PRODUCTION)) {
267 // Get edge for which this node connector is head
268 Pair<Edge, Set<Property>> props = this.edgeMap.get(
269 GlobalConstants.DEFAULT.toString()).get(connector);
270 // On switch mgr restart the props get reset
274 Set<Property> propSet = props.getRight();
275 if (propSet == null) {
280 for (Property prop : propSet) {
281 if (prop instanceof Bandwidth) {
282 bw = ((Bandwidth) prop).getValue();
287 // Skip if agent did not provide a bandwidth info for the edge
292 // Compare bandwidth usage
293 Long switchId = (Long) connector.getNode().getID();
294 Short port = (Short) connector.getID();
295 float rate = statsMgr.getTransmitRate(switchId, port);
296 if (rate > bwThresholdFactor * bw) {
297 if (!connectorsOverUtilized.contains(connector)) {
298 connectorsOverUtilized.add(connector);
299 this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
303 if (connectorsOverUtilized.contains(connector)) {
304 connectorsOverUtilized.remove(connector);
305 this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
306 UpdateType.REMOVED));
314 * Function called by the dependency manager when at least one dependency
315 * become unsatisfied or when the component is shutting down because for
316 * example bundle is being stopped.
320 logger.trace("DESTROY called!");
326 * Function called by dependency manager after "init ()" is called and after
327 * the services provided by the class are registered in the service registry
331 logger.trace("START called!");
332 notifyThread.start();
333 bwUtilNotifyThread.start();
334 ofPluginTopoBulkUpdate.start();
335 pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
339 * Function called by the dependency manager before the services exported by
340 * the component are unregistered, this will be followed by a "destroy ()"
345 logger.trace("STOP called!");
347 notifyThread.interrupt();
350 void setTopologyServiceShimListener(Map<?, ?> props,
351 ITopologyServiceShimListener s) {
353 logger.error("Didn't receive the service properties");
356 String containerName = (String) props.get("containerName");
357 if (containerName == null) {
358 logger.error("containerName not supplied");
361 if ((this.topologyServiceShimListeners != null)
362 && !this.topologyServiceShimListeners
363 .containsKey(containerName)) {
364 this.topologyServiceShimListeners.put(containerName, s);
365 logger.trace("Added topologyServiceShimListener for container: {}",
370 void unsetTopologyServiceShimListener(Map<?, ?> props,
371 ITopologyServiceShimListener s) {
373 logger.error("Didn't receive the service properties");
376 String containerName = (String) props.get("containerName");
377 if (containerName == null) {
378 logger.error("containerName not supplied");
381 if ((this.topologyServiceShimListeners != null)
382 && this.topologyServiceShimListeners.containsKey(containerName)
383 && this.topologyServiceShimListeners.get(containerName).equals(
385 this.topologyServiceShimListeners.remove(containerName);
387 "Removed topologyServiceShimListener for container: {}",
392 void setStatisticsManager(IOFStatisticsManager s) {
396 void unsetStatisticsManager(IOFStatisticsManager s) {
397 if (this.statsMgr == s) {
398 this.statsMgr = null;
402 IPluginOutConnectionService connectionPluginOutService;
403 void setIPluginOutConnectionService(IPluginOutConnectionService s) {
404 connectionPluginOutService = s;
407 void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
408 if (connectionPluginOutService == s) {
409 connectionPluginOutService = null;
413 private void removeNodeConnector(String container,
414 NodeConnector nodeConnector) {
415 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
416 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
418 if (edgePropsMap == null) {
422 // Remove edge in one direction
423 Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
424 if (edgeProps == null) {
427 teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
428 UpdateType.REMOVED));
430 // Remove edge in another direction
431 edgeProps = edgePropsMap
432 .get(edgeProps.getLeft().getHeadNodeConnector());
433 if (edgeProps == null) {
436 teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
437 UpdateType.REMOVED));
439 // Update in one shot
440 notifyEdge(container, teuList);
444 * Update local cache and return true if it needs to notify upper layer
445 * Topology listeners.
448 * The network container
454 * The edge properties
455 * @return true if it needs to notify upper layer Topology listeners
457 private boolean updateLocalEdgeMap(String container, Edge edge,
458 UpdateType type, Set<Property> props) {
459 ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
461 NodeConnector src = edge.getTailNodeConnector();
462 Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
469 if (edgePropsMap == null) {
470 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
473 if (edgePropsMap.containsKey(src)
474 && edgePropsMap.get(src).equals(edgeProps)) {
475 // Entry already exists. No update.
482 edgePropsMap.put(src, edgeProps);
483 edgeMap.put(container, edgePropsMap);
487 if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
488 edgePropsMap.remove(src);
489 if (edgePropsMap.isEmpty()) {
490 edgeMap.remove(container);
492 edgeMap.put(container, edgePropsMap);
499 "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
500 new Object[] { type.getName(), edge, container });
505 "notifyLocalEdgeMap: {} for Edge {} in container {}",
506 new Object[] { type.getName(), edge, container });
512 private void notifyEdge(String container, Edge edge, UpdateType type,
513 Set<Property> props) {
514 boolean notifyListeners;
516 // Update local cache
517 notifyListeners = updateLocalEdgeMap(container, edge, type, props);
519 // Prepare to update TopologyService
520 if (notifyListeners) {
521 notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
523 logger.debug("notifyEdge: {} Edge {} in container {}",
524 new Object[] { type.getName(), edge, container });
528 private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
529 if (etuList == null) {
535 List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
536 boolean notifyListeners = false, rv;
538 for (TopoEdgeUpdate etu : etuList) {
539 edge = etu.getEdge();
540 type = etu.getUpdateType();
542 // Update local cache
543 rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
545 if (!notifyListeners) {
546 notifyListeners = true;
548 etuNotifyList.add(etu);
550 "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
551 new Object[] { type.getName(), edge, container });
555 // Prepare to update TopologyService
556 if (notifyListeners) {
557 notifyQ.add(new NotifyEntry(container, etuNotifyList));
558 logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
563 public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
564 if ((edge == null) || (type == null)) {
568 // Notify default container
569 notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
571 // Notify the corresponding containers
572 List<String> containers = getEdgeContainers(edge);
573 if (containers != null) {
574 for (String container : containers) {
575 notifyEdge(container, edge, type, props);
581 * Return a list of containers the edge associated with
583 private List<String> getEdgeContainers(Edge edge) {
584 NodeConnector src = edge.getTailNodeConnector(), dst = edge
585 .getHeadNodeConnector();
587 if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
588 /* Find the common containers for both ends */
589 List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
590 .get(dst), cmnContainers = null;
591 if ((srcContainers != null) && (dstContainers != null)) {
592 cmnContainers = new ArrayList<String>(srcContainers);
593 cmnContainers.retainAll(dstContainers);
595 return cmnContainers;
598 * If the neighbor is part of a monitored production network, get
599 * the containers that the edge port belongs to
601 return this.containerMap.get(dst);
606 public void tagUpdated(String containerName, Node n, short oldTag,
607 short newTag, UpdateType t) {
611 public void containerFlowUpdated(String containerName,
612 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
616 public void nodeConnectorUpdated(String containerName, NodeConnector p,
618 if (this.containerMap == null) {
619 logger.error("containerMap is NULL");
622 List<String> containers = this.containerMap.get(p);
623 if (containers == null) {
624 containers = new CopyOnWriteArrayList<String>();
626 boolean updateMap = false;
629 if (!containers.contains(containerName)) {
630 containers.add(containerName);
635 if (containers.contains(containerName)) {
636 containers.remove(containerName);
638 removeNodeConnector(containerName, p);
645 if (containers.isEmpty()) {
646 // Do cleanup to reduce memory footprint if no
647 // elements to be tracked
648 this.containerMap.remove(p);
650 this.containerMap.put(p, containers);
656 public void containerModeUpdated(UpdateType t) {
660 private void registerWithOSGIConsole() {
661 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
663 bundleContext.registerService(CommandProvider.class.getName(), this,
668 public String getHelp() {
669 StringBuffer help = new StringBuffer();
670 help.append("---Topology Service Shim---\n");
671 help.append("\t pem [container] - Print edgeMap entries");
672 help.append(" for a given container\n");
673 return help.toString();
676 public void _pem(CommandInterpreter ci) {
677 String container = ci.nextArgument();
678 if (container == null) {
679 container = GlobalConstants.DEFAULT.toString();
682 ci.println("Container: " + container);
683 ci.println(" Edge Bandwidth");
685 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
687 if (edgePropsMap == null) {
691 for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
692 if (edgeProps == null) {
697 Set<Property> props = edgeProps.getRight();
699 for (Property prop : props) {
700 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
701 bw = ((Bandwidth) prop).getValue();
706 ci.println(edgeProps.getLeft() + " " + bw);
708 ci.println("Total number of Edges: " + count);
711 public void _bwfactor(CommandInterpreter ci) {
712 String factorString = ci.nextArgument();
713 if (factorString == null) {
714 ci.println("Bw threshold: " + this.bwThresholdFactor);
715 ci.println("Insert a non null bw threshold");
718 bwThresholdFactor = Float.parseFloat(factorString);
719 ci.println("New Bw threshold: " + this.bwThresholdFactor);
723 * This method will trigger topology updates to be sent toward SAL. SAL then
724 * pushes the updates to ALL the applications that have registered as
725 * listeners for this service. SAL has no way of knowing which application
726 * requested for the refresh.
728 * As an example of this case, is stopping and starting the Topology
729 * Manager. When the topology Manager is stopped, and restarted, it will no
730 * longer have the latest topology. Hence, a request is sent here.
732 * @param containerName
736 public void requestRefresh(String containerName) {
737 // wake up a bulk update thread and exit
738 // the thread will execute the bulkUpdate()
739 bulkNotifyQ.add(containerName);
743 * Reading the current topology database, the method will replay all the
744 * edge updates for the ITopologyServiceShimListener instance in the given
745 * container, which will in turn publish them toward SAL.
747 * @param containerName
749 private void TopologyBulkUpdate(String containerName) {
750 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
752 logger.debug("Try bulk update for container:{}", containerName);
753 edgePropMap = edgeMap.get(containerName);
754 if (edgePropMap == null) {
755 logger.debug("No edges known for container:{}", containerName);
758 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
760 if (topologServiceShimListener == null) {
761 logger.debug("No topology service shim listener for container:{}",
766 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
767 for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
768 if (edgeProps != null) {
770 teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps
771 .getRight(), UpdateType.ADDED));
772 logger.trace("Add edge {}", edgeProps.getLeft());
776 topologServiceShimListener.edgeUpdate(teuList);
778 logger.debug("Sent {} updates", i);
782 public void updateNode(Node node, UpdateType type, Set<Property> props) {
786 public void updateNodeConnector(NodeConnector nodeConnector,
787 UpdateType type, Set<Property> props) {
788 List<String> containers = new ArrayList<String>();
789 List<String> conList = this.containerMap.get(nodeConnector);
791 containers.add(GlobalConstants.DEFAULT.toString());
792 if (conList != null) {
793 containers.addAll(conList);
804 boolean rmEdge = false;
805 for (Property prop : props) {
806 if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
807 || ((prop instanceof State) && (((State) prop)
808 .getValue() != State.EDGE_UP))) {
810 * If port admin down or link down, remove the edges
811 * associated with the port
819 for (String cName : containers) {
820 removeNodeConnector(cName, nodeConnector);
825 for (String cName : containers) {
826 removeNodeConnector(cName, nodeConnector);