2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.List;
15 import java.util.Timer;
16 import java.util.TimerTask;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.CopyOnWriteArrayList;
21 import java.util.concurrent.LinkedBlockingQueue;
23 import org.apache.commons.lang3.tuple.ImmutablePair;
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.eclipse.osgi.framework.console.CommandInterpreter;
26 import org.eclipse.osgi.framework.console.CommandProvider;
27 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
28 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
29 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
30 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
31 import org.osgi.framework.BundleContext;
32 import org.osgi.framework.FrameworkUtil;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 import org.opendaylight.controller.sal.core.Bandwidth;
37 import org.opendaylight.controller.sal.core.Config;
38 import org.opendaylight.controller.sal.core.ContainerFlow;
39 import org.opendaylight.controller.sal.core.Edge;
40 import org.opendaylight.controller.sal.core.IContainerListener;
41 import org.opendaylight.controller.sal.core.Node;
42 import org.opendaylight.controller.sal.core.NodeConnector;
43 import org.opendaylight.controller.sal.core.Property;
44 import org.opendaylight.controller.sal.core.State;
45 import org.opendaylight.controller.sal.core.UpdateType;
46 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
47 import org.opendaylight.controller.sal.utils.GlobalConstants;
50 * The class describes a shim layer that relays the topology events from
51 * OpenFlow core to various listeners. The notifications are filtered based on
52 * container configurations.
54 public class TopologyServiceShim implements IDiscoveryService,
55 IContainerListener, CommandProvider, IRefreshInternalProvider,
56 IInventoryShimExternalListener {
57 protected static final Logger logger = LoggerFactory
58 .getLogger(TopologyServiceShim.class);
59 private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
60 private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
61 private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
63 private BlockingQueue<NotifyEntry> notifyQ;
64 private Thread notifyThread;
65 private BlockingQueue<String> bulkNotifyQ;
66 private Thread ofPluginTopoBulkUpdate;
67 private volatile Boolean shuttingDown = false;
68 private IOFStatisticsManager statsMgr;
69 private Timer pollTimer;
70 private TimerTask txRatePoller;
71 private Thread bwUtilNotifyThread;
72 private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
73 private List<NodeConnector> connectorsOverUtilized;
74 private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
79 Pair<Edge, Set<Property>> edgeProps;
82 NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
84 this.container = container;
85 this.edgeProps = edgeProps;
90 class TopologyNotify implements Runnable {
91 private final BlockingQueue<NotifyEntry> notifyQ;
93 TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
94 this.notifyQ = notifyQ;
100 NotifyEntry entry = notifyQ.take();
102 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
103 .get(entry.container);
104 topologServiceShimListener.edgeUpdate(
105 entry.edgeProps.getLeft(), entry.type,
106 entry.edgeProps.getRight());
109 } catch (InterruptedException e1) {
110 logger.warn("TopologyNotify interrupted {}", e1.getMessage());
114 } catch (Exception e2) {
121 class UtilizationUpdate {
122 NodeConnector connector;
125 UtilizationUpdate(NodeConnector connector, UpdateType type) {
126 this.connector = connector;
131 class BwUtilizationNotify implements Runnable {
132 private final BlockingQueue<UtilizationUpdate> notifyQ;
134 BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
135 this.notifyQ = notifyQ;
141 UtilizationUpdate update = notifyQ.take();
142 NodeConnector connector = update.connector;
143 Set<String> containerList = edgeMap.keySet();
144 for (String container : containerList) {
145 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
147 Edge edge = edgePropsMap.get(connector).getLeft();
148 if (edge.getTailNodeConnector().equals(connector)) {
149 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
151 if (update.type == UpdateType.ADDED) {
152 topologServiceShimListener
153 .edgeOverUtilized(edge);
155 topologServiceShimListener
156 .edgeUtilBackToNormal(edge);
160 } catch (InterruptedException e1) {
162 "Edge Bandwidth Utilization Notify Thread interrupted {}",
167 } catch (Exception e2) {
175 * Function called by the dependency manager when all the required
176 * dependencies are satisfied
180 logger.trace("Init called");
181 connectorsOverUtilized = new ArrayList<NodeConnector>();
182 notifyQ = new LinkedBlockingQueue<NotifyEntry>();
183 notifyThread = new Thread(new TopologyNotify(notifyQ));
184 bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
185 bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
186 bulkNotifyQ = new LinkedBlockingQueue<String>();
187 ofPluginTopoBulkUpdate = new Thread(new Runnable() {
192 String containerName = bulkNotifyQ.take();
193 logger.debug("Bulk Notify container:{}", containerName);
194 TopologyBulkUpdate(containerName);
195 } catch (InterruptedException e) {
196 logger.warn("Topology Bulk update thread interrupted");
203 }, "Topology Bulk Update");
205 // Initialize node connector tx bit rate poller timer
206 pollTimer = new Timer();
207 txRatePoller = new TimerTask() {
214 registerWithOSGIConsole();
218 * Continuously polls the transmit bit rate for all the node connectors from
219 * statistics manager and trigger the warning notification upward when the
220 * transmit rate is above a threshold which is a percentage of the edge
223 protected void pollTxBitRates() {
224 Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
225 .get(GlobalConstants.DEFAULT.toString());
226 if (globalContainerEdges == null) {
230 for (NodeConnector connector : globalContainerEdges.keySet()) {
231 // Skip if node connector belongs to production switch
232 if (connector.getType().equals(
233 NodeConnector.NodeConnectorIDType.PRODUCTION)) {
237 // Get edge for which this node connector is head
238 Pair<Edge, Set<Property>> props = this.edgeMap.get(
239 GlobalConstants.DEFAULT.toString()).get(connector);
240 // On switch mgr restart the props get reset
244 Set<Property> propSet = props.getRight();
245 if (propSet == null) {
250 for (Property prop : propSet) {
251 if (prop instanceof Bandwidth) {
252 bw = ((Bandwidth) prop).getValue();
257 // Skip if agent did not provide a bandwidth info for the edge
262 // Compare bandwidth usage
263 Long switchId = (Long) connector.getNode().getID();
264 Short port = (Short) connector.getID();
265 float rate = statsMgr.getTransmitRate(switchId, port);
266 if (rate > bwThresholdFactor * bw) {
267 if (!connectorsOverUtilized.contains(connector)) {
268 connectorsOverUtilized.add(connector);
269 this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
273 if (connectorsOverUtilized.contains(connector)) {
274 connectorsOverUtilized.remove(connector);
275 this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
276 UpdateType.REMOVED));
284 * Function called by the dependency manager when at least one dependency
285 * become unsatisfied or when the component is shutting down because for
286 * example bundle is being stopped.
290 logger.trace("DESTROY called!");
296 * Function called by dependency manager after "init ()" is called and after
297 * the services provided by the class are registered in the service registry
301 logger.trace("START called!");
302 notifyThread.start();
303 bwUtilNotifyThread.start();
304 ofPluginTopoBulkUpdate.start();
305 pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
309 * Function called by the dependency manager before the services exported by
310 * the component are unregistered, this will be followed by a "destroy ()"
315 logger.trace("STOP called!");
317 notifyThread.interrupt();
320 void setTopologyServiceShimListener(Map<?, ?> props,
321 ITopologyServiceShimListener s) {
323 logger.error("Didn't receive the service properties");
326 String containerName = (String) props.get("containerName");
327 if (containerName == null) {
328 logger.error("containerName not supplied");
331 if ((this.topologyServiceShimListeners != null)
332 && !this.topologyServiceShimListeners
333 .containsKey(containerName)) {
334 this.topologyServiceShimListeners.put(containerName, s);
335 logger.trace("Added topologyServiceShimListener for container: {}",
340 void unsetTopologyServiceShimListener(Map<?, ?> props,
341 ITopologyServiceShimListener s) {
343 logger.error("Didn't receive the service properties");
346 String containerName = (String) props.get("containerName");
347 if (containerName == null) {
348 logger.error("containerName not supplied");
351 if ((this.topologyServiceShimListeners != null)
352 && this.topologyServiceShimListeners.containsKey(containerName)
353 && this.topologyServiceShimListeners.get(containerName).equals(
355 this.topologyServiceShimListeners.remove(containerName);
356 logger.trace("Removed topologyServiceShimListener for container: {}",
361 void setStatisticsManager(IOFStatisticsManager s) {
365 void unsetStatisticsManager(IOFStatisticsManager s) {
366 if (this.statsMgr == s) {
367 this.statsMgr = null;
371 private void removeNodeConnector(String container,
372 NodeConnector nodeConnector) {
373 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
375 if (edgePropsMap == null) {
379 // Remove edge in one direction
380 Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
381 if (edgeProps == null) {
384 notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
386 // Remove edge in another direction
387 edgeProps = edgePropsMap
388 .get(edgeProps.getLeft().getHeadNodeConnector());
389 if (edgeProps == null) {
392 notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
395 private void notifyEdge(String container, Edge edge, UpdateType type,
396 Set<Property> props) {
397 ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
399 NodeConnector src = edge.getTailNodeConnector();
400 Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
406 if (edgePropsMap == null) {
407 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
409 if (edgePropsMap.containsKey(src)
410 && edgePropsMap.get(src).equals(edgeProps)) {
411 // Entry already exists. Return here.
415 edgePropsMap.put(src, edgeProps);
416 edgeMap.put(container, edgePropsMap);
419 if (edgePropsMap != null) {
420 edgePropsMap.remove(src);
421 if (edgePropsMap.isEmpty()) {
422 edgeMap.remove(container);
424 edgeMap.put(container, edgePropsMap);
429 logger.debug("notifyEdge: invalid {} for Edge {} in container {}",
430 type, edge, container);
434 notifyQ.add(new NotifyEntry(container, edgeProps, type));
436 logger.debug("notifyEdge: {} Edge {} in container {}",
437 type, edge, container);
441 public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
442 if ((edge == null) || (type == null)) {
446 // Notify default container
447 notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
449 // Notify the corresponding containers
450 List<String> containers = getEdgeContainers(edge);
451 if (containers != null) {
452 for (String container : containers) {
453 notifyEdge(container, edge, type, props);
459 * Return a list of containers the edge associated with
461 private List<String> getEdgeContainers(Edge edge) {
462 NodeConnector src = edge.getTailNodeConnector(), dst = edge
463 .getHeadNodeConnector();
465 if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
466 /* Find the common containers for both ends */
467 List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
468 .get(dst), cmnContainers = null;
469 if ((srcContainers != null) && (dstContainers != null)) {
470 cmnContainers = new ArrayList<String>(srcContainers);
471 cmnContainers.retainAll(dstContainers);
473 return cmnContainers;
476 * If the neighbor is part of a monitored production network, get
477 * the containers that the edge port belongs to
479 return this.containerMap.get(dst);
484 public void tagUpdated(String containerName, Node n, short oldTag,
485 short newTag, UpdateType t) {
489 public void containerFlowUpdated(String containerName,
490 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
494 public void nodeConnectorUpdated(String containerName, NodeConnector p,
496 if (this.containerMap == null) {
497 logger.error("containerMap is NULL");
500 List<String> containers = this.containerMap.get(p);
501 if (containers == null) {
502 containers = new CopyOnWriteArrayList<String>();
504 boolean updateMap = false;
507 if (!containers.contains(containerName)) {
508 containers.add(containerName);
513 if (containers.contains(containerName)) {
514 containers.remove(containerName);
516 removeNodeConnector(containerName, p);
523 if (containers.isEmpty()) {
524 // Do cleanup to reduce memory footprint if no
525 // elements to be tracked
526 this.containerMap.remove(p);
528 this.containerMap.put(p, containers);
534 public void containerModeUpdated(UpdateType t) {
538 private void registerWithOSGIConsole() {
539 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
541 bundleContext.registerService(CommandProvider.class.getName(), this,
546 public String getHelp() {
547 StringBuffer help = new StringBuffer();
548 help.append("---Topology Service Shim---\n");
549 help.append("\t pem [container] - Print edgeMap entries");
550 help.append(" for a given container\n");
551 return help.toString();
554 public void _pem(CommandInterpreter ci) {
555 String container = ci.nextArgument();
556 if (container == null) {
557 container = GlobalConstants.DEFAULT.toString();
560 ci.println("Container: " + container);
561 ci.println(" Edge Bandwidth");
563 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
565 if (edgePropsMap == null) {
569 for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
570 if (edgeProps == null) {
575 for (Property prop : edgeProps.getRight()) {
576 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
577 bw = ((Bandwidth) prop).getValue();
581 ci.println(edgeProps.getLeft() + " " + bw);
583 ci.println("Total number of Edges: " + count);
586 public void _bwfactor(CommandInterpreter ci) {
587 String factorString = ci.nextArgument();
588 if (factorString == null) {
589 ci.println("Bw threshold: " + this.bwThresholdFactor);
590 ci.println("Insert a non null bw threshold");
593 bwThresholdFactor = Float.parseFloat(factorString);
594 ci.println("New Bw threshold: " + this.bwThresholdFactor);
598 * This method will trigger topology updates to be sent toward SAL. SAL then
599 * pushes the updates to ALL the applications that have registered as
600 * listeners for this service. SAL has no way of knowing which application
601 * requested for the refresh.
603 * As an example of this case, is stopping and starting the Topology
604 * Manager. When the topology Manager is stopped, and restarted, it will no
605 * longer have the latest topology. Hence, a request is sent here.
607 * @param containerName
611 public void requestRefresh(String containerName) {
612 // wake up a bulk update thread and exit
613 // the thread will execute the bulkUpdate()
614 bulkNotifyQ.add(containerName);
618 * Reading the current topology database, the method will replay all the
619 * edge updates for the ITopologyServiceShimListener instance in the given
620 * container, which will in turn publish them toward SAL.
622 * @param containerName
624 private void TopologyBulkUpdate(String containerName) {
625 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
627 logger.debug("Try bulk update for container:{}", containerName);
628 edgePropMap = edgeMap.get(containerName);
629 if (edgePropMap == null) {
630 logger.debug("No edges known for container:{}", containerName);
633 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
635 if (topologServiceShimListener == null) {
636 logger.debug("No topology service shim listener for container:{}",
641 for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
642 if (edgeProps != null) {
644 logger.trace("Add edge {}", edgeProps.getLeft());
645 topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
646 UpdateType.ADDED, edgeProps.getRight());
649 logger.debug("Sent {} updates", i);
653 public void updateNode(Node node, UpdateType type, Set<Property> props) {
657 public void updateNodeConnector(NodeConnector nodeConnector,
658 UpdateType type, Set<Property> props) {
659 List<String> containers = new ArrayList<String>();
660 List<String> conList = this.containerMap.get(nodeConnector);
662 containers.add(GlobalConstants.DEFAULT.toString());
663 if (conList != null) {
664 containers.addAll(conList);
675 boolean rmEdge = false;
676 for (Property prop : props) {
677 if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
678 || ((prop instanceof State) && (((State) prop)
679 .getValue() != State.EDGE_UP))) {
681 * If port admin down or link down, remove the edges
682 * associated with the port
690 for (String cName : containers) {
691 removeNodeConnector(cName, nodeConnector);
696 for (String cName : containers) {
697 removeNodeConnector(cName, nodeConnector);