2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
16 import java.util.Timer;
17 import java.util.TimerTask;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.LinkedBlockingQueue;
24 import org.apache.commons.lang3.tuple.ImmutablePair;
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.eclipse.osgi.framework.console.CommandInterpreter;
27 import org.eclipse.osgi.framework.console.CommandProvider;
28 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
29 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
30 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
31 import org.osgi.framework.BundleContext;
32 import org.osgi.framework.FrameworkUtil;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 import org.opendaylight.controller.sal.core.Bandwidth;
37 import org.opendaylight.controller.sal.core.ContainerFlow;
38 import org.opendaylight.controller.sal.core.Edge;
39 import org.opendaylight.controller.sal.core.IContainerListener;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
45 import org.opendaylight.controller.sal.utils.GlobalConstants;
48 * The class describes a shim layer that relays the topology events from
49 * OpenFlow core to various listeners. The notifications are filtered based on
50 * container configurations.
52 public class TopologyServiceShim implements IDiscoveryService,
53 IContainerListener, CommandProvider, IRefreshInternalProvider {
54 protected static final Logger logger = LoggerFactory
55 .getLogger(TopologyServiceShim.class);
56 private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
57 private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
58 private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
60 private BlockingQueue<NotifyEntry> notifyQ;
61 private Thread notifyThread;
62 private BlockingQueue<String> bulkNotifyQ;
63 private Thread ofPluginTopoBulkUpdate;
64 private volatile Boolean shuttingDown = false;
65 private IOFStatisticsManager statsMgr;
66 private Timer pollTimer;
67 private TimerTask txRatePoller;
68 private Thread bwUtilNotifyThread;
69 private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
70 private List<NodeConnector> connectorsOverUtilized;
71 private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
76 Pair<Edge, Set<Property>> edgeProps;
79 NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
81 this.container = container;
82 this.edgeProps = edgeProps;
87 class TopologyNotify implements Runnable {
88 private final BlockingQueue<NotifyEntry> notifyQ;
90 TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
91 this.notifyQ = notifyQ;
97 NotifyEntry entry = notifyQ.take();
99 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
100 .get(entry.container);
101 topologServiceShimListener.edgeUpdate(
102 entry.edgeProps.getLeft(), entry.type,
103 entry.edgeProps.getRight());
106 } catch (InterruptedException e1) {
107 logger.warn("TopologyNotify interrupted", e1.getMessage());
111 } catch (Exception e2) {
112 e2.printStackTrace();
118 class UtilizationUpdate {
119 NodeConnector connector;
122 UtilizationUpdate(NodeConnector connector, UpdateType type) {
123 this.connector = connector;
128 class BwUtilizationNotify implements Runnable {
129 private final BlockingQueue<UtilizationUpdate> notifyQ;
131 BwUtilizationNotify(BlockingQueue<UtilizationUpdate> notifyQ) {
132 this.notifyQ = notifyQ;
138 UtilizationUpdate update = notifyQ.take();
139 NodeConnector connector = update.connector;
140 Set<String> containerList = edgeMap.keySet();
141 for (String container : containerList) {
142 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
144 Edge edge = edgePropsMap.get(connector).getLeft();
145 if (edge.getTailNodeConnector().equals(connector)) {
146 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
148 if (update.type == UpdateType.ADDED) {
149 topologServiceShimListener
150 .edgeOverUtilized(edge);
152 topologServiceShimListener
153 .edgeUtilBackToNormal(edge);
157 } catch (InterruptedException e1) {
159 "Edge Bandwidth Utilization Notify Thread interrupted",
164 } catch (Exception e2) {
165 e2.printStackTrace();
172 * Function called by the dependency manager when all the required
173 * dependencies are satisfied
177 logger.trace("Init called");
178 connectorsOverUtilized = new ArrayList<NodeConnector>();
179 notifyQ = new LinkedBlockingQueue<NotifyEntry>();
180 notifyThread = new Thread(new TopologyNotify(notifyQ));
181 bwUtilNotifyQ = new LinkedBlockingQueue<UtilizationUpdate>();
182 bwUtilNotifyThread = new Thread(new BwUtilizationNotify(bwUtilNotifyQ));
183 bulkNotifyQ = new LinkedBlockingQueue<String>();
184 ofPluginTopoBulkUpdate = new Thread(new Runnable() {
189 String containerName = bulkNotifyQ.take();
190 logger.debug("Bulk Notify container:{}", containerName);
191 TopologyBulkUpdate(containerName);
192 } catch (InterruptedException e) {
193 logger.warn("Topology Bulk update thread interrupted");
200 }, "Topology Bulk Update");
202 // Initialize node connector tx bit rate poller timer
203 pollTimer = new Timer();
204 txRatePoller = new TimerTask() {
211 registerWithOSGIConsole();
215 * Continuously polls the transmit bit rate for all the node connectors from
216 * statistics manager and trigger the warning notification upward when the
217 * transmit rate is above a threshold which is a percentage of the edge
220 protected void pollTxBitRates() {
221 Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
222 .get(GlobalConstants.DEFAULT.toString());
223 if (globalContainerEdges == null) {
227 for (NodeConnector connector : globalContainerEdges.keySet()) {
228 // Skip if node connector belongs to production switch
229 if (connector.getType().equals(
230 NodeConnector.NodeConnectorIDType.PRODUCTION)) {
234 // Get edge for which this node connector is head
235 Pair<Edge, Set<Property>> props = this.edgeMap.get(
236 GlobalConstants.DEFAULT.toString()).get(connector);
237 // On switch mgr restart the props get reset
241 Set<Property> propSet = props.getRight();
242 if (propSet == null) {
247 for (Property prop : propSet) {
248 if (prop instanceof Bandwidth) {
249 bw = ((Bandwidth) prop).getValue();
254 // Skip if agent did not provide a bandwidth info for the edge
259 // Compare bandwidth usage
260 Long switchId = (Long) connector.getNode().getID();
261 Short port = (Short) connector.getID();
262 float rate = statsMgr.getTransmitRate(switchId, port);
263 if (rate > bwThresholdFactor * bw) {
264 if (!connectorsOverUtilized.contains(connector)) {
265 connectorsOverUtilized.add(connector);
266 this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
270 if (connectorsOverUtilized.contains(connector)) {
271 connectorsOverUtilized.remove(connector);
272 this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
273 UpdateType.REMOVED));
281 * Function called by the dependency manager when at least one dependency
282 * become unsatisfied or when the component is shutting down because for
283 * example bundle is being stopped.
287 logger.trace("DESTROY called!");
293 * Function called by dependency manager after "init ()" is called and after
294 * the services provided by the class are registered in the service registry
298 logger.trace("START called!");
299 notifyThread.start();
300 bwUtilNotifyThread.start();
301 ofPluginTopoBulkUpdate.start();
302 pollTimer.scheduleAtFixedRate(txRatePoller, 10000, 5000);
306 * Function called by the dependency manager before the services exported by
307 * the component are unregistered, this will be followed by a "destroy ()"
312 logger.trace("STOP called!");
314 notifyThread.interrupt();
317 void setTopologyServiceShimListener(Map<?, ?> props,
318 ITopologyServiceShimListener s) {
320 logger.error("Didn't receive the service properties");
323 String containerName = (String) props.get("containerName");
324 if (containerName == null) {
325 logger.error("containerName not supplied");
328 if ((this.topologyServiceShimListeners != null)
329 && !this.topologyServiceShimListeners
330 .containsKey(containerName)) {
331 this.topologyServiceShimListeners.put(containerName, s);
332 logger.trace("Added topologyServiceShimListener for container:"
337 void unsetTopologyServiceShimListener(Map<?, ?> props,
338 ITopologyServiceShimListener s) {
340 logger.error("Didn't receive the service properties");
343 String containerName = (String) props.get("containerName");
344 if (containerName == null) {
345 logger.error("containerName not supplied");
348 if ((this.topologyServiceShimListeners != null)
349 && this.topologyServiceShimListeners.containsKey(containerName)
350 && this.topologyServiceShimListeners.get(containerName).equals(
352 this.topologyServiceShimListeners.remove(containerName);
353 logger.trace("Removed topologyServiceShimListener for container: "
358 void setStatisticsManager(IOFStatisticsManager s) {
362 void unsetStatisticsManager(IOFStatisticsManager s) {
363 if (this.statsMgr == s) {
364 this.statsMgr = null;
368 private void removeNodeConnector(String container,
369 NodeConnector nodeConnector) {
370 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
372 if (edgePropsMap == null) {
376 // Remove edge in one direction
377 Pair<Edge, Set<Property>> edgeProps = edgePropsMap.get(nodeConnector);
378 if (edgeProps == null) {
381 notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
383 // Remove edge in another direction
384 edgeProps = edgePropsMap
385 .get(edgeProps.getLeft().getHeadNodeConnector());
386 if (edgeProps == null) {
389 notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
392 private void notifyEdge(String container, Edge edge, UpdateType type,
393 Set<Property> props) {
394 ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
396 NodeConnector src = edge.getTailNodeConnector();
397 Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
403 if (edgePropsMap == null) {
404 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
406 if (edgePropsMap.containsKey(src)
407 && edgePropsMap.get(src).equals(edgeProps)) {
408 // Entry already exists. Return here.
412 edgePropsMap.put(src, edgeProps);
413 edgeMap.put(container, edgePropsMap);
416 if (edgePropsMap != null) {
417 edgePropsMap.remove(src);
418 if (edgePropsMap.isEmpty()) {
419 edgeMap.remove(container);
421 edgeMap.put(container, edgePropsMap);
426 logger.debug("Invalid " + type + " Edge " + edge
427 + " in container {}", container);
431 notifyQ.add(new NotifyEntry(container, edgeProps, type));
433 logger.trace(type + " Edge " + edge + " in container {}", container);
437 public void notifyEdge(Edge edge, UpdateType type, Set<Property> props) {
438 if ((edge == null) || (type == null)) {
442 // Notify default container
443 notifyEdge(GlobalConstants.DEFAULT.toString(), edge, type, props);
445 // Notify the corresponding containers
446 List<String> containers = getEdgeContainers(edge);
447 if (containers != null) {
448 for (String container : containers) {
449 notifyEdge(container, edge, type, props);
455 * Return a list of containers the edge associated with
457 private List<String> getEdgeContainers(Edge edge) {
458 NodeConnector src = edge.getTailNodeConnector(), dst = edge
459 .getHeadNodeConnector();
461 if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
462 /* Find the common containers for both ends */
463 List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
464 .get(dst), cmnContainers = null;
465 if ((srcContainers != null) && (dstContainers != null)) {
466 cmnContainers = new ArrayList<String>(srcContainers);
467 cmnContainers.retainAll(dstContainers);
469 return cmnContainers;
472 * If the neighbor is part of a monitored production network, get
473 * the containers that the edge port belongs to
475 return this.containerMap.get(dst);
480 public void tagUpdated(String containerName, Node n, short oldTag,
481 short newTag, UpdateType t) {
485 public void containerFlowUpdated(String containerName,
486 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
490 public void nodeConnectorUpdated(String containerName, NodeConnector p,
492 if (this.containerMap == null) {
493 logger.error("containerMap is NULL");
496 List<String> containers = this.containerMap.get(p);
497 if (containers == null) {
498 containers = new CopyOnWriteArrayList<String>();
500 boolean updateMap = false;
503 if (!containers.contains(containerName)) {
504 containers.add(containerName);
509 if (containers.contains(containerName)) {
510 containers.remove(containerName);
512 removeNodeConnector(containerName, p);
519 if (containers.isEmpty()) {
520 // Do cleanup to reduce memory footprint if no
521 // elements to be tracked
522 this.containerMap.remove(p);
524 this.containerMap.put(p, containers);
530 public void containerModeUpdated(UpdateType t) {
534 private void registerWithOSGIConsole() {
535 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
537 bundleContext.registerService(CommandProvider.class.getName(), this,
542 public String getHelp() {
543 StringBuffer help = new StringBuffer();
544 help.append("---Topology Service Shim---\n");
545 help.append("\t pem [container] - Print edgeMap entries");
546 help.append(" for a given container\n");
547 return help.toString();
550 public void _pem(CommandInterpreter ci) {
551 String container = ci.nextArgument();
552 if (container == null) {
553 container = GlobalConstants.DEFAULT.toString();
556 ci.println("Container: " + container);
557 ci.println(" Edge Bandwidth");
559 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
561 if (edgePropsMap == null) {
565 for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
566 if (edgeProps == null) {
571 for (Property prop : edgeProps.getRight()) {
572 if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
573 bw = ((Bandwidth) prop).getValue();
577 ci.println(edgeProps.getLeft() + " " + bw);
579 ci.println("Total number of Edges: " + count);
582 public void _bwfactor(CommandInterpreter ci) {
583 String factorString = ci.nextArgument();
584 if (factorString == null) {
585 ci.println("Bw threshold: " + this.bwThresholdFactor);
586 ci.println("Insert a non null bw threshold");
589 bwThresholdFactor = Float.parseFloat(factorString);
590 ci.println("New Bw threshold: " + this.bwThresholdFactor);
594 * This method will trigger topology updates to be sent toward SAL. SAL then
595 * pushes the updates to ALL the applications that have registered as
596 * listeners for this service. SAL has no way of knowing which application
597 * requested for the refresh.
599 * As an example of this case, is stopping and starting the Topology
600 * Manager. When the topology Manager is stopped, and restarted, it will no
601 * longer have the latest topology. Hence, a request is sent here.
603 * @param containerName
607 public void requestRefresh(String containerName) {
608 // wake up a bulk update thread and exit
609 // the thread will execute the bulkUpdate()
610 bulkNotifyQ.add(containerName);
614 * Reading the current topology database, the method will replay all the
615 * edge updates for the ITopologyServiceShimListener instance in the given
616 * container, which will in turn publish them toward SAL.
618 * @param containerName
620 private void TopologyBulkUpdate(String containerName) {
621 Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
623 logger.debug("Try bulk update for container:{}", containerName);
624 edgePropMap = edgeMap.get(containerName);
625 if (edgePropMap == null) {
626 logger.debug("No edges known for container:{}", containerName);
629 ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
631 if (topologServiceShimListener == null) {
632 logger.debug("No topology service shim listener for container:{}",
637 for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
638 if (edgeProps != null) {
640 logger.trace("Add edge {}", edgeProps.getLeft());
641 topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
642 UpdateType.ADDED, edgeProps.getRight());
645 logger.debug("Sent {} updates", i);