2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
16 import java.util.Timer;
17 import java.util.TimerTask;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.LinkedBlockingQueue;
24 import org.apache.commons.lang3.tuple.ImmutablePair;
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.eclipse.osgi.framework.console.CommandInterpreter;
27 import org.eclipse.osgi.framework.console.CommandProvider;
28 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
29 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
30 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
32 import org.osgi.framework.BundleContext;
33 import org.osgi.framework.FrameworkUtil;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import org.opendaylight.controller.sal.core.Bandwidth;
38 import org.opendaylight.controller.sal.core.Config;
39 import org.opendaylight.controller.sal.core.ContainerFlow;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.IContainerListener;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.State;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
48 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
49 import org.opendaylight.controller.sal.utils.GlobalConstants;
52 * The class describes a shim layer that relays the topology events from
53 * OpenFlow core to various listeners. The notifications are filtered based on
54 * container configurations.
56 public class TopologyServiceShim implements IDiscoveryService,
57 IContainerListener, CommandProvider, IRefreshInternalProvider,
58 IInventoryShimExternalListener {
59 protected static final Logger logger = LoggerFactory
60 .getLogger(TopologyServiceShim.class);
61 private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
62 private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
63 private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
65 private BlockingQueue<NotifyEntry> notifyQ;
66 private Thread notifyThread;
67 private BlockingQueue<String> bulkNotifyQ;
68 private Thread ofPluginTopoBulkUpdate;
69 private volatile Boolean shuttingDown = false;
70 private IOFStatisticsManager statsMgr;
71 private Timer pollTimer;
72 private TimerTask txRatePoller;
73 private Thread bwUtilNotifyThread;
74 private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
75 private List<NodeConnector> connectorsOverUtilized;
76 private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
81 List<TopoEdgeUpdate> teuList;
83 public NotifyEntry(String container, TopoEdgeUpdate teu) {
84 this.container = container;
85 this.teuList = new ArrayList<TopoEdgeUpdate>();
87 this.teuList.add(teu);
91 public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
92 this.container = container;
93 this.teuList = new ArrayList<TopoEdgeUpdate>();
94 if (teuList != null) {
95 this.teuList.addAll(teuList);
100 class TopologyNotify implements Runnable {
101 private final BlockingQueue<NotifyEntry> notifyQ;
102 private NotifyEntry entry;
103 private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
104 private List<TopoEdgeUpdate> teuList;
105 private boolean notifyListeners;
107 TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
108 this.notifyQ = notifyQ;
115 notifyListeners = false;
116 while (!notifyQ.isEmpty()) {
117 entry = notifyQ.take();
118 teuList = teuMap.get(entry.container);
119 if (teuList == null) {
120 teuList = new ArrayList<TopoEdgeUpdate>();
122 // group all the updates together
123 teuList.addAll(entry.teuList);
124 teuMap.put(entry.container, teuList);
125 notifyListeners = true;
128 if (notifyListeners) {
129 for (String container : teuMap.keySet()) {
130 // notify the listener
131 topologyServiceShimListeners.get(container)
132 .edgeUpdate(teuMap.get(container));
137 } catch (InterruptedException e1) {
138 logger.warn("TopologyNotify interrupted {}",
143 } catch (Exception e2) {
144 logger.error("", e2);
150 class UtilizationUpdate {
151 NodeConnector connector;
154 UtilizationUpdate(NodeConnector connector, UpdateType type) {
155 this.connector = connector;
160 class BwUtilizationNotify implements Runnable {
161 private final BlockingQueue<UtilizationUpdate> notifyQ;
163 BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
164 this.notifyQ = notifyQ;
170 UtilizationUpdate update = notifyQ.take();
171 NodeConnector connector = update.connector;
172 Set<String> containerList = edgeMap.keySet();
173 for (String container : containerList) {
174 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
176 Edge edge = edgePropsMap.get(connector).getLeft();
177 if (edge.getTailNodeConnector().equals(connector)) {
178 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
180 if (update.type == UpdateType.ADDED) {
181 topologServiceShimListener
182 .edgeOverUtilized(edge);
184 topologServiceShimListener
185 .edgeUtilBackToNormal(edge);
189 } catch (InterruptedException e1) {
191 "Edge Bandwidth Utilization Notify Thread interrupted {}",
196 } catch (Exception e2) {
197 logger.error("", e2);
204 * Function called by the dependency manager when all the required
205 * dependencies are satisfied
209 logger.trace("Init called");
210 connectorsOverUtilized = new ArrayList<NodeConnector>();
211 notifyQ = new LinkedBlockingQueue<NotifyEntry>();
212 notifyThread = new Thread(new TopologyNotify(notifyQ));
213 bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
214 bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
215 bulkNotifyQ = new LinkedBlockingQueue<String>();
216 ofPluginTopoBulkUpdate = new Thread(new Runnable() {
221 String containerName = bulkNotifyQ.take();
222 logger.debug("Bulk Notify container:{}", containerName);
223 TopologyBulkUpdate(containerName);
224 } catch (InterruptedException e) {
225 logger.warn("Topology Bulk update thread interrupted");
232 }, "Topology Bulk Update");
234 // Initialize node connector tx bit rate poller timer
235 pollTimer = new Timer();
236 txRatePoller = new TimerTask() {
243 registerWithOSGIConsole();
247 * Continuously polls the transmit bit rate for all the node connectors from
248 * statistics manager and trigger the warning notification upward when the
249 * transmit rate is above a threshold which is a percentage of the edge
252 protected void pollTxBitRates() {
253 Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
254 .get(GlobalConstants.DEFAULT.toString());
255 if (globalContainerEdges == null) {
259 for (NodeConnector connector : globalContainerEdges.keySet()) {
260 // Skip if node connector belongs to production switch
261 if (connector.getType().equals(
262 NodeConnector.NodeConnectorIDType.PRODUCTION)) {
266 // Get edge for which this node connector is head
267 Pair<Edge, Set<Property>> props = this.edgeMap.get(
268 GlobalConstants.DEFAULT.toString()).get(connector);
269 // On switch mgr restart the props get reset
273 Set<Property> propSet = props.getRight();
274 if (propSet == null) {
279 for (Property prop : propSet) {
280 if (prop instanceof Bandwidth) {
281 bw = ((Bandwidth) prop).getValue();
286 // Skip if agent did not provide a bandwidth info for the edge
291 // Compare bandwidth usage
292 Long switchId = (Long) connector.getNode().getID();
293 Short port = (Short) connector.getID();
294 float rate = statsMgr.getTransmitRate(switchId, port);
295 if (rate > bwThresholdFactor * bw) {
296 if (!connectorsOverUtilized.contains(connector)) {
297 connectorsOverUtilized.add(connector);
298 this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
302 if (connectorsOverUtilized.contains(connector)) {
303 connectorsOverUtilized.remove(connector);
304 this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
305 UpdateType.REMOVED));
313 * Function called by the dependency manager when at least one dependency
314 * become unsatisfied or when the component is shutting down because for
315 * example bundle is being stopped.
319 logger.trace("DESTROY called!");
325 * Function called by dependency manager after "init ()" is called and after
326 * the services provided by the class are registered in the service registry
330 logger.trace("START called!");
331 notifyThread.start();
332 bwUtilNotifyThread.start();
333 ofPluginTopoBulkUpdate.start();
334 pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
338 * Function called by the dependency manager before the services exported by
339 * the component are unregistered, this will be followed by a "destroy ()"
344 logger.trace("STOP called!");
346 notifyThread.interrupt();
349 void setTopologyServiceShimListener(Map<?, ?> props,
350 ITopologyServiceShimListener s) {
352 logger.error("Didn't receive the service properties");
355 String containerName = (String) props.get("containerName");
356 if (containerName == null) {
357 logger.error("containerName not supplied");
360 if ((this.topologyServiceShimListeners != null)
361 && !this.topologyServiceShimListeners
362 .containsKey(containerName)) {
363 this.topologyServiceShimListeners.put(containerName, s);
364 logger.trace("Added topologyServiceShimListener for container: {}",
369 void unsetTopologyServiceShimListener(Map<?, ?> props,
370 ITopologyServiceShimListener s) {
372 logger.error("Didn't receive the service properties");
375 String containerName = (String) props.get("containerName");
376 if (containerName == null) {
377 logger.error("containerName not supplied");
380 if ((this.topologyServiceShimListeners != null)
381 && this.topologyServiceShimListeners.containsKey(containerName)
382 && this.topologyServiceShimListeners.get(containerName).equals(
384 this.topologyServiceShimListeners.remove(containerName);
386 "Removed topologyServiceShimListener for container: {}",
391 void setStatisticsManager(IOFStatisticsManager s) {
395 void unsetStatisticsManager(IOFStatisticsManager s) {
396 if (this.statsMgr == s) {
397 this.statsMgr = null;
401 private void removeNodeConnector(String container,
402 NodeConnector nodeConnector) {
403 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
404 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
406 if (edgePropsMap == null) {
410 // Remove edge in one direction
411 Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
412 if (edgeProps == null) {
415 teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
416 UpdateType.REMOVED));
418 // Remove edge in another direction
419 edgeProps = edgePropsMap
420 .get(edgeProps.getLeft().getHeadNodeConnector());
421 if (edgeProps == null) {
424 teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
425 UpdateType.REMOVED));
427 // Update in one shot
428 notifyEdge(container, teuList);
432 * Update local cache and return true if it needs to notify upper layer
433 * Topology listeners.
436 * The network container
442 * The edge properties
443 * @return true if it needs to notify upper layer Topology listeners
445 private boolean updateLocalEdgeMap(String container, Edge edge,
446 UpdateType type, Set<Property> props) {
447 ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
449 NodeConnector src = edge.getTailNodeConnector();
450 Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
457 if (edgePropsMap == null) {
458 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
461 if (edgePropsMap.containsKey(src)
462 && edgePropsMap.get(src).equals(edgeProps)) {
463 // Entry already exists. No update.
470 edgePropsMap.put(src, edgeProps);
471 edgeMap.put(container, edgePropsMap);
475 if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
476 edgePropsMap.remove(src);
477 if (edgePropsMap.isEmpty()) {
478 edgeMap.remove(container);
480 edgeMap.put(container, edgePropsMap);
487 "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
488 new Object[] { type.getName(), edge, container });
493 "notifyLocalEdgeMap: {} for Edge {} in container {}",
494 new Object[] { type.getName(), edge, container });
500 private void notifyEdge(String container, Edge edge, UpdateType type,
501 Set<Property> props) {
502 boolean notifyListeners;
504 // Update local cache
505 notifyListeners = updateLocalEdgeMap(container, edge, type, props);
507 // Prepare to update TopologyService
508 if (notifyListeners) {
509 notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
511 logger.debug("notifyEdge: {} Edge {} in container {}",
512 new Object[] { type.getName(), edge, container });
516 private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
517 if (etuList == null) {
523 List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
524 boolean notifyListeners = false, rv;
526 for (TopoEdgeUpdate etu : etuList) {
527 edge = etu.getEdge();
528 type = etu.getUpdateType();
530 // Update local cache
531 rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
533 if (!notifyListeners) {
534 notifyListeners = true;
536 etuNotifyList.add(etu);
538 "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
539 new Object[] { type.getName(), edge, container });
543 // Prepare to update TopologyService
544 if (notifyListeners) {
545 notifyQ.add(new NotifyEntry(container, etuNotifyList));
546 logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
551 public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
552 if ((edge == null) || (type == null)) {
556 // Notify default container
557 notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
559 // Notify the corresponding containers
560 List<String> containers = getEdgeContainers(edge);
561 if (containers != null) {
562 for (String container : containers) {
563 notifyEdge(container, edge, type, props);
569 * Return a list of containers the edge associated with
571 private List<String> getEdgeContainers(Edge edge) {
572 NodeConnector src = edge.getTailNodeConnector(), dst = edge
573 .getHeadNodeConnector();
575 if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
576 /* Find the common containers for both ends */
577 List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
578 .get(dst), cmnContainers = null;
579 if ((srcContainers != null) && (dstContainers != null)) {
580 cmnContainers = new ArrayList<String>(srcContainers);
581 cmnContainers.retainAll(dstContainers);
583 return cmnContainers;
586 * If the neighbor is part of a monitored production network, get
587 * the containers that the edge port belongs to
589 return this.containerMap.get(dst);
594 public void tagUpdated(String containerName, Node n, short oldTag,
595 short newTag, UpdateType t) {
599 public void containerFlowUpdated(String containerName,
600 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
604 public void nodeConnectorUpdated(String containerName, NodeConnector p,
606 if (this.containerMap == null) {
607 logger.error("containerMap is NULL");
610 List<String> containers = this.containerMap.get(p);
611 if (containers == null) {
612 containers = new CopyOnWriteArrayList<String>();
614 boolean updateMap = false;
617 if (!containers.contains(containerName)) {
618 containers.add(containerName);
623 if (containers.contains(containerName)) {
624 containers.remove(containerName);
626 removeNodeConnector(containerName, p);
633 if (containers.isEmpty()) {
634 // Do cleanup to reduce memory footprint if no
635 // elements to be tracked
636 this.containerMap.remove(p);
638 this.containerMap.put(p, containers);
644 public void containerModeUpdated(UpdateType t) {
648 private void registerWithOSGIConsole() {
649 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
651 bundleContext.registerService(CommandProvider.class.getName(), this,
656 public String getHelp() {
657 StringBuffer help = new StringBuffer();
658 help.append("---Topology Service Shim---\n");
659 help.append("\t pem [container] - Print edgeMap entries");
660 help.append(" for a given container\n");
661 return help.toString();
664 public void _pem(CommandInterpreter ci) {
665 String container = ci.nextArgument();
666 if (container == null) {
667 container = GlobalConstants.DEFAULT.toString();
670 ci.println("Container: " + container);
671 ci.println(" Edge Bandwidth");
673 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
675 if (edgePropsMap == null) {
679 for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
680 if (edgeProps == null) {
685 Set<Property> props = edgeProps.getRight();
687 for (Property prop : props) {
688 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
689 bw = ((Bandwidth) prop).getValue();
694 ci.println(edgeProps.getLeft() + " " + bw);
696 ci.println("Total number of Edges: " + count);
699 public void _bwfactor(CommandInterpreter ci) {
700 String factorString = ci.nextArgument();
701 if (factorString == null) {
702 ci.println("Bw threshold: " + this.bwThresholdFactor);
703 ci.println("Insert a non null bw threshold");
706 bwThresholdFactor = Float.parseFloat(factorString);
707 ci.println("New Bw threshold: " + this.bwThresholdFactor);
711 * This method will trigger topology updates to be sent toward SAL. SAL then
712 * pushes the updates to ALL the applications that have registered as
713 * listeners for this service. SAL has no way of knowing which application
714 * requested for the refresh.
716 * As an example of this case, is stopping and starting the Topology
717 * Manager. When the topology Manager is stopped, and restarted, it will no
718 * longer have the latest topology. Hence, a request is sent here.
720 * @param containerName
724 public void requestRefresh(String containerName) {
725 // wake up a bulk update thread and exit
726 // the thread will execute the bulkUpdate()
727 bulkNotifyQ.add(containerName);
731 * Reading the current topology database, the method will replay all the
732 * edge updates for the ITopologyServiceShimListener instance in the given
733 * container, which will in turn publish them toward SAL.
735 * @param containerName
737 private void TopologyBulkUpdate(String containerName) {
738 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
740 logger.debug("Try bulk update for container:{}", containerName);
741 edgePropMap = edgeMap.get(containerName);
742 if (edgePropMap == null) {
743 logger.debug("No edges known for container:{}", containerName);
746 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
748 if (topologServiceShimListener == null) {
749 logger.debug("No topology service shim listener for container:{}",
754 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
755 for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
756 if (edgeProps != null) {
758 teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps
759 .getRight(), UpdateType.ADDED));
760 logger.trace("Add edge {}", edgeProps.getLeft());
764 topologServiceShimListener.edgeUpdate(teuList);
766 logger.debug("Sent {} updates", i);
770 public void updateNode(Node node, UpdateType type, Set<Property> props) {
774 public void updateNodeConnector(NodeConnector nodeConnector,
775 UpdateType type, Set<Property> props) {
776 List<String> containers = new ArrayList<String>();
777 List<String> conList = this.containerMap.get(nodeConnector);
779 containers.add(GlobalConstants.DEFAULT.toString());
780 if (conList != null) {
781 containers.addAll(conList);
792 boolean rmEdge = false;
793 for (Property prop : props) {
794 if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
795 || ((prop instanceof State) && (((State) prop)
796 .getValue() != State.EDGE_UP))) {
798 * If port admin down or link down, remove the edges
799 * associated with the port
807 for (String cName : containers) {
808 removeNodeConnector(cName, nodeConnector);
813 for (String cName : containers) {
814 removeNodeConnector(cName, nodeConnector);