-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.eclipse.osgi.framework.console.CommandProvider;
+import org.opendaylight.controller.protocol_plugin.openflow.IDiscoveryListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
import org.slf4j.LoggerFactory;
import org.opendaylight.controller.sal.core.Bandwidth;
+import org.opendaylight.controller.sal.core.Config;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.Edge;
import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.Property;
+import org.opendaylight.controller.sal.core.State;
import org.opendaylight.controller.sal.core.UpdateType;
-import org.opendaylight.controller.sal.discovery.IDiscoveryService;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
import org.opendaylight.controller.sal.utils.GlobalConstants;
/**
- * The class describes a shim layer that relays the topology events from OpenFlow
- * core to various listeners. The notifications are filtered based on container
- * configurations.
+ * The class describes a shim layer that relays the topology events from
+ * OpenFlow core to various listeners. The notifications are filtered based on
+ * container configurations.
*/
-public class TopologyServiceShim implements IDiscoveryService,
- IContainerListener, CommandProvider, IRefreshInternalProvider {
+public class TopologyServiceShim implements IDiscoveryListener,
+ IContainerListener, CommandProvider, IRefreshInternalProvider,
+ IInventoryShimExternalListener {
protected static final Logger logger = LoggerFactory
.getLogger(TopologyServiceShim.class);
private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
private ConcurrentMap<NodeConnector, List<String>> containerMap = new ConcurrentHashMap<NodeConnector, List<String>>();
- private ConcurrentMap<String, Map<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, Map<NodeConnector, Pair<Edge, Set<Property>>>>();
+ private ConcurrentMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>> edgeMap = new ConcurrentHashMap<String, ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>>>();
private BlockingQueue<NotifyEntry> notifyQ;
private Thread notifyThread;
private Thread bwUtilNotifyThread;
private BlockingQueue<UtilizationUpdate> bwUtilNotifyQ;
private List<NodeConnector> connectorsOverUtilized;
- private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link bandwidth
+ private float bwThresholdFactor = (float) 0.8; // Threshold = 80% of link
+ // bandwidth
class NotifyEntry {
String container;
- Pair<Edge, Set<Property>> edgeProps;
- UpdateType type;
+ List<TopoEdgeUpdate> teuList;
- NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
- UpdateType type) {
+ public NotifyEntry(String container, TopoEdgeUpdate teu) {
this.container = container;
- this.edgeProps = edgeProps;
- this.type = type;
+ this.teuList = new ArrayList<TopoEdgeUpdate>();
+ if (teu != null) {
+ this.teuList.add(teu);
+ }
+ }
+
+ public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
+ this.container = container;
+ this.teuList = new ArrayList<TopoEdgeUpdate>();
+ if (teuList != null) {
+ this.teuList.addAll(teuList);
+ }
}
}
class TopologyNotify implements Runnable {
private final BlockingQueue<NotifyEntry> notifyQ;
+ private NotifyEntry entry;
+ private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
+ private List<TopoEdgeUpdate> teuList;
+ private boolean notifyListeners;
TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
this.notifyQ = notifyQ;
public void run() {
while (true) {
try {
- NotifyEntry entry = notifyQ.take();
+ teuMap.clear();
+ notifyListeners = false;
+ while (!notifyQ.isEmpty()) {
+ entry = notifyQ.take();
+ teuList = teuMap.get(entry.container);
+ if (teuList == null) {
+ teuList = new ArrayList<TopoEdgeUpdate>();
+ }
+ // group all the updates together
+ teuList.addAll(entry.teuList);
+ teuMap.put(entry.container, teuList);
+ notifyListeners = true;
+ }
- ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
- .get(entry.container);
- topologServiceShimListener.edgeUpdate(entry.edgeProps
- .getLeft(), entry.type, entry.edgeProps.getRight());
+ if (notifyListeners) {
+ for (String container : teuMap.keySet()) {
+ // notify the listener
+ topologyServiceShimListeners.get(container)
+ .edgeUpdate(teuMap.get(container));
+ }
+ }
- entry = null;
+ Thread.sleep(100);
} catch (InterruptedException e1) {
- logger.warn("TopologyNotify interrupted", e1.getMessage());
+ logger.warn("TopologyNotify interrupted {}",
+ e1.getMessage());
if (shuttingDown) {
return;
}
} catch (Exception e2) {
- e2.printStackTrace();
+ logger.error("", e2);
}
}
}
try {
UtilizationUpdate update = notifyQ.take();
NodeConnector connector = update.connector;
- Set<String> containerList = edgeMap.keySet();//.get(connector);
+ Set<String> containerList = edgeMap.keySet();
for (String container : containerList) {
Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
.get(container);
}
}
} catch (InterruptedException e1) {
- logger
- .warn(
- "Edge Bandwidth Utilization Notify Thread interrupted",
- e1.getMessage());
+ logger.warn(
+ "Edge Bandwidth Utilization Notify Thread interrupted {}",
+ e1.getMessage());
if (shuttingDown) {
return;
}
} catch (Exception e2) {
- e2.printStackTrace();
+ logger.error("", e2);
}
}
}
}
/**
- * Continuously polls the transmit bit rate for all the node connectors
- * from statistics manager and trigger the warning notification upward
- * when the transmit rate is above a threshold which is a percentage of
- * the edge bandwidth
+ * Continuously polls the transmit bit rate for all the node connectors from
+ * statistics manager and trigger the warning notification upward when the
+ * transmit rate is above a threshold which is a percentage of the edge
+ * bandwidth
*/
protected void pollTxBitRates() {
Map<NodeConnector, Pair<Edge, Set<Property>>> globalContainerEdges = edgeMap
for (NodeConnector connector : globalContainerEdges.keySet()) {
// Skip if node connector belongs to production switch
- if (connector.getType() == NodeConnector.NodeConnectorIDType.PRODUCTION) {
+ if (connector.getType().equals(
+ NodeConnector.NodeConnectorIDType.PRODUCTION)) {
continue;
}
}
/**
- * Function called by the dependency manager when at least one
- * dependency become unsatisfied or when the component is shutting
- * down because for example bundle is being stopped.
+ * Function called by the dependency manager when at least one dependency
+ * become unsatisfied or when the component is shutting down because for
+ * example bundle is being stopped.
*
*/
void destroy() {
}
/**
- * Function called by dependency manager after "init ()" is called
- * and after the services provided by the class are registered in
- * the service registry
+ * Function called by dependency manager after "init ()" is called and after
+ * the services provided by the class are registered in the service registry
*
*/
void start() {
}
/**
- * Function called by the dependency manager before the services
- * exported by the component are unregistered, this will be
- * followed by a "destroy ()" calls
+ * Function called by the dependency manager before the services exported by
+ * the component are unregistered, this will be followed by a "destroy ()"
+ * calls
*
*/
void stop() {
return;
}
if ((this.topologyServiceShimListeners != null)
- && !this.topologyServiceShimListeners.containsKey(s)) {
+ && !this.topologyServiceShimListeners
+ .containsKey(containerName)) {
this.topologyServiceShimListeners.put(containerName, s);
- logger.trace("Added topologyServiceShimListener for container:"
- + containerName);
+ logger.trace("Added topologyServiceShimListener for container: {}",
+ containerName);
}
}
return;
}
if ((this.topologyServiceShimListeners != null)
- && this.topologyServiceShimListeners.containsKey(s)) {
+ && this.topologyServiceShimListeners.containsKey(containerName)
+ && this.topologyServiceShimListeners.get(containerName).equals(
+ s)) {
this.topologyServiceShimListeners.remove(containerName);
- logger.trace("Removed topologyServiceShimListener for container: "
- + containerName);
+ logger.trace(
+ "Removed topologyServiceShimListener for container: {}",
+ containerName);
}
}
private void removeNodeConnector(String container,
NodeConnector nodeConnector) {
+ List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
.get(container);
if (edgePropsMap == null) {
if (edgeProps == null) {
return;
}
- notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
+ teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
+ UpdateType.REMOVED));
// Remove edge in another direction
edgeProps = edgePropsMap
if (edgeProps == null) {
return;
}
- notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
+ teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
+ UpdateType.REMOVED));
+
+ // Update in one shot
+ notifyEdge(container, teuList);
}
- private void notifyEdge(String container, Edge edge, UpdateType type,
- Set<Property> props) {
- Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
+ /**
+ * Update local cache and return true if it needs to notify upper layer
+ * Topology listeners.
+ *
+ * @param container
+ * The network container
+ * @param edge
+ * The edge
+ * @param type
+ * The update type
+ * @param props
+ * The edge properties
+ * @return true if it needs to notify upper layer Topology listeners
+ */
+ private boolean updateLocalEdgeMap(String container, Edge edge,
+ UpdateType type, Set<Property> props) {
+ ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
.get(container);
NodeConnector src = edge.getTailNodeConnector();
Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
edge, props);
+ boolean rv = false;
switch (type) {
case ADDED:
case CHANGED:
if (edgePropsMap == null) {
- edgePropsMap = new HashMap<NodeConnector, Pair<Edge, Set<Property>>>();
+ edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
+ rv = true;
} else {
if (edgePropsMap.containsKey(src)
&& edgePropsMap.get(src).equals(edgeProps)) {
- // Entry already exists. Return here.
- return;
+ // Entry already exists. No update.
+ rv = false;
+ } else {
+ rv = true;
}
}
- edgePropsMap.put(src, edgeProps);
- edgeMap.put(container, edgePropsMap);
+ if (rv) {
+ edgePropsMap.put(src, edgeProps);
+ edgeMap.put(container, edgePropsMap);
+ }
break;
case REMOVED:
- if (edgePropsMap != null) {
+ if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
edgePropsMap.remove(src);
if (edgePropsMap.isEmpty()) {
edgeMap.remove(container);
} else {
edgeMap.put(container, edgePropsMap);
}
+ rv = true;
}
break;
default:
- logger.debug("Invalid " + type + " Edge " + edge
- + " in container {}", container);
+ logger.debug(
+ "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
+ new Object[] { type.getName(), edge, container });
+ }
+
+ if (rv) {
+ logger.debug(
+ "notifyLocalEdgeMap: {} for Edge {} in container {}",
+ new Object[] { type.getName(), edge, container });
+ }
+
+ return rv;
+ }
+
+ private void notifyEdge(String container, Edge edge, UpdateType type,
+ Set<Property> props) {
+ boolean notifyListeners;
+
+ // Update local cache
+ notifyListeners = updateLocalEdgeMap(container, edge, type, props);
+
+ // Prepare to update TopologyService
+ if (notifyListeners) {
+ notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
+ type)));
+ logger.debug("notifyEdge: {} Edge {} in container {}",
+ new Object[] { type.getName(), edge, container });
+ }
+ }
+
+ private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
+ if (etuList == null) {
return;
}
- notifyQ.add(new NotifyEntry(container, edgeProps, type));
+ Edge edge;
+ UpdateType type;
+ List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
+ boolean notifyListeners = false, rv;
+
+ for (TopoEdgeUpdate etu : etuList) {
+ edge = etu.getEdge();
+ type = etu.getUpdateType();
+
+ // Update local cache
+ rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
+ if (rv) {
+ if (!notifyListeners) {
+ notifyListeners = true;
+ }
+ etuNotifyList.add(etu);
+ logger.debug(
+ "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
+ new Object[] { type.getName(), edge, container });
+ }
+ }
- logger.trace(type + " Edge " + edge + " in container {}", container);
+ // Prepare to update TopologyService
+ if (notifyListeners) {
+ notifyQ.add(new NotifyEntry(container, etuNotifyList));
+ logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
+ }
}
@Override
NodeConnector src = edge.getTailNodeConnector(), dst = edge
.getHeadNodeConnector();
- if (!src.getType().equals(
- NodeConnector.NodeConnectorIDType.PRODUCTION)) {
+ if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
/* Find the common containers for both ends */
List<String> srcContainers = this.containerMap.get(src), dstContainers = this.containerMap
.get(dst), cmnContainers = null;
public String getHelp() {
StringBuffer help = new StringBuffer();
help.append("---Topology Service Shim---\n");
- help
- .append("\t pem [container] - Print edgeMap entries for a given container\n");
+ help.append("\t pem [container] - Print edgeMap entries");
+ help.append(" for a given container\n");
return help.toString();
}
}
ci.println("Container: " + container);
- ci
- .println(" Edge Bandwidth");
+ ci.println(" Edge Bandwidth");
Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
.get(container);
if (edgePropsMap == null) {
return;
}
+ int count = 0;
for (Pair<Edge, Set<Property>> edgeProps : edgePropsMap.values()) {
if (edgeProps == null) {
continue;
}
long bw = 0;
- for (Property prop : edgeProps.getRight()) {
- if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
- bw = ((Bandwidth) prop).getValue();
+ Set<Property> props = edgeProps.getRight();
+ if (props != null) {
+ for (Property prop : props) {
+ if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
+ bw = ((Bandwidth) prop).getValue();
+ }
}
}
-
+ count++;
ci.println(edgeProps.getLeft() + " " + bw);
}
+ ci.println("Total number of Edges: " + count);
}
public void _bwfactor(CommandInterpreter ci) {
}
/**
- * This method will trigger topology updates to be sent
- * toward SAL. SAL then pushes the updates to ALL the applications
- * that have registered as listeners for this service. SAL has no
- * way of knowing which application requested for the refresh.
+ * This method will trigger topology updates to be sent toward SAL. SAL then
+ * pushes the updates to ALL the applications that have registered as
+ * listeners for this service. SAL has no way of knowing which application
+ * requested for the refresh.
*
- * As an example of this case, is stopping and starting the
- * Topology Manager. When the topology Manager is stopped,
- * and restarted, it will no longer have the latest topology.
- * Hence, a request is sent here.
+ * As an example of this case, is stopping and starting the Topology
+ * Manager. When the topology Manager is stopped, and restarted, it will no
+ * longer have the latest topology. Hence, a request is sent here.
*
* @param containerName
* @return void
}
/**
- * Reading the current topology database, the method will replay
- * all the edge updates for the ITopologyServiceShimListener instance
- * in the given container, which will in turn publish them toward SAL.
+ * Reading the current topology database, the method will replay all the
+ * edge updates for the ITopologyServiceShimListener instance in the given
+ * container, which will in turn publish them toward SAL.
+ *
* @param containerName
*/
private void TopologyBulkUpdate(String containerName) {
return;
}
int i = 0;
+ List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
if (edgeProps != null) {
i++;
+ teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps
+ .getRight(), UpdateType.ADDED));
logger.trace("Add edge {}", edgeProps.getLeft());
- topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
- UpdateType.ADDED, edgeProps.getRight());
}
}
+ if (i > 0) {
+ topologServiceShimListener.edgeUpdate(teuList);
+ }
logger.debug("Sent {} updates", i);
}
+ @Override
+ public void updateNode(Node node, UpdateType type, Set<Property> props) {
+ }
+
+ @Override
+ public void updateNodeConnector(NodeConnector nodeConnector,
+ UpdateType type, Set<Property> props) {
+ List<String> containers = new ArrayList<String>();
+ List<String> conList = this.containerMap.get(nodeConnector);
+
+ containers.add(GlobalConstants.DEFAULT.toString());
+ if (conList != null) {
+ containers.addAll(conList);
+ }
+
+ switch (type) {
+ case ADDED:
+ break;
+ case CHANGED:
+ if (props == null) {
+ break;
+ }
+
+ boolean rmEdge = false;
+ for (Property prop : props) {
+ if (((prop instanceof Config) && (((Config) prop).getValue() != Config.ADMIN_UP))
+ || ((prop instanceof State) && (((State) prop)
+ .getValue() != State.EDGE_UP))) {
+ /*
+ * If port admin down or link down, remove the edges
+ * associated with the port
+ */
+ rmEdge = true;
+ break;
+ }
+ }
+
+ if (rmEdge) {
+ for (String cName : containers) {
+ removeNodeConnector(cName, nodeConnector);
+ }
+ }
+ break;
+ case REMOVED:
+ for (String cName : containers) {
+ removeNodeConnector(cName, nodeConnector);
+ }
+ break;
+ default:
+ break;
+ }
+ }
}